我的 AWS Glue 作业日志出现损坏问题。
日志从 Glue Python 脚本发出并发送到 CloudWatch。我最近开始使用 AWS EMF 发布结构化 JSON 日志,并注意到日志已损坏。一些超过 1024 字节的日志语句会拆分为多个 CloudWatch 日志条目,而有时较小的日志语句会分组为单个 CloudWatch 日志条目。这显然破坏了 JSON 消息,并混淆了其他日志分析查询。
问题似乎是在快速连续发出许多日志消息时开始的。我看到一个日志条目有一个大约 6000b 的 JSON 有效负载,在日志的开头没有分割,然后当快速连续打印很多行时,所有内容都变成 1024 字节块。
这些作业是用 Python 3.9 编写并在 Glue 4.0 上运行的 Spark 3.3 作业。受影响的日志是 Python 脚本生成的日志,而不是 Spark 生成的日志。
我做了一些测试,并编写了一个脚本,除了初始化 Glue 之外什么都不做,然后使用 Python 的打印函数、Python 记录器或来自在
getLogger()
上调用 GlueContext
的记录器输出日志消息,以及我的结论是执行 Python 的 CloudWatch 代理配置错误,或者运行脚本和 CloudWatch 代理之间的某些中间进程配置错误。
我用于测试的脚本如下:https://gist.github.com/mhvelplund/961c8868fbfdf857dcd0a623a353870b
在启用连续日志记录并使用 Glue 记录器 (
--TYPE=glue
) 的情况下运行脚本,一切正常,但日志与 Spark 日志位于同一日志流中,其中包含大量 Java 噪音。
使用 Python 记录器 (
--TYPE=log
) 或打印语句 (--TYPE=print
) 运行是出现问题的地方。日志行被分组或分割,看起来是任意的,并且从一次运行到另一次运行不一定以相同的方式。这表明该问题与时间相关。使用没有 output
委托,而只是原始打印语句的脚本版本,我能够在单个 CloudWatch 消息中获取每个打印行。
在每个输出语句之前插入小至 100 毫秒 (
--DELAY=100
) 的轻微延迟,分组和拆分问题就消失了。
不幸的是,使用 Glue 记录器并不是一个好的解决方案,因为我的真实脚本中有使用 Python 记录器和原始打印语句的遗留代码,并且更改这些代码会很痛苦。
以前有人在使用 CloudWatch 或 Glue 时遇到过这个问题吗?如果是的话,你是如何在不使用猴子补丁的情况下解决这个问题的
sys.stdout
? 😉
因此,我在此向 AWS 支持发布了这个问题。后者承认将 Glue 作业的标准日志发送到 CloudWatch 的方式存在配置错误,但没有提供解决方法。
由于
GlueContext
提供的记录器实际上是 Java 记录器的包装器,因此它不能与正常的日志记录语义一起使用。它还充斥着来自 Spark 的日志消息。
我最终的解决方案是使用 Watchtower 创建普通记录器,并破解 aws-embedded-metrics-python 包以使用记录器。
"""Utility methods an classes for handling logging to Watchtower."""
import logging
import sys
import watchtower
from aws_embedded_metrics.environment.environment_detector import EnvironmentCache
from aws_embedded_metrics.environment.local_environment import LocalEnvironment
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.serializers import Serializer
from aws_embedded_metrics.serializers.log_serializer import LogSerializer
from aws_embedded_metrics.sinks import Sink
from awsglue.utils import getResolvedOptions
logger = logging.getLogger(__name__)
class LoggerSink(Sink):
"""A sink that logs serialized metrics to the logger."""
def __init__(self, serializer: Serializer = LogSerializer()):
self.serializer = serializer
def accept(self, context: MetricsContext) -> None:
for serialized_content in self.serializer.serialize(context):
if serialized_content:
logger.info(serialized_content)
@staticmethod
def name() -> str:
return "LoggerSink"
opts = getResolvedOptions(sys.argv, [])
watchtower_handler = watchtower.CloudWatchLogHandler(
log_group_name="/aws-glue/jobs/output",
log_stream_name=opts["JOB_RUN_ID"],
)
watchtower_handler.setFormatter(
logging.Formatter(
fmt="%(asctime)s,%(msecs)d %(levelname)s [%(filename)s:%(lineno)d#%(funcName)s] %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
)
logging.basicConfig(
handlers=[watchtower_handler],
datefmt="%Y-%m-%d:%H:%M:%S",
level=logging.INFO
force=True, # Nuke any existing loggers
)
logger_env = LocalEnvironment()
logger_env.sink = LoggerSink()
EnvironmentCache.environment = logger_env
我写入 Glue 将使用的同一日志流。最终结果是,我关心的内容通过 Watchtower 记录器路由,而遗留代码中标准输出的内容则由 Glue 默认捕获 stdout 和 stderr 流来处理。
这意味着至少正确记录的数据和指标不会混乱。不是最好的解决方案。