AWS Glue 日志已损坏

问题描述 投票:0回答:1

我的 AWS Glue 作业日志出现损坏问题。

日志从 Glue Python 脚本发出并发送到 CloudWatch。我最近开始使用 AWS EMF 发布结构化 JSON 日志,并注意到日志已损坏。一些超过 1024 字节的日志语句会拆分为多个 CloudWatch 日志条目,而有时较小的日志语句会分组为单个 CloudWatch 日志条目。这显然破坏了 JSON 消息,并混淆了其他日志分析查询。

问题似乎是在快速连续发出许多日志消息时开始的。我看到一个日志条目有一个大约 6000b 的 JSON 有效负载,在日志的开头没有分割,然后当快速连续打印很多行时,所有内容都变成 1024 字节块。

这些作业是用 Python 3.9 编写并在 Glue 4.0 上运行的 Spark 3.3 作业。受影响的日志是 Python 脚本生成的日志,而不是 Spark 生成的日志。

我做了一些测试,并编写了一个脚本,除了初始化 Glue 之外什么都不做,然后使用 Python 的打印函数、Python 记录器或来自在

getLogger()
上调用
GlueContext
的记录器输出日志消息,以及我的结论是执行 Python 的 CloudWatch 代理配置错误,或者运行脚本和 CloudWatch 代理之间的某些中间进程配置错误。

我用于测试的脚本如下:https://gist.github.com/mhvelplund/961c8868fbfdf857dcd0a623a353870b

在启用连续日志记录并使用 Glue 记录器 (

--TYPE=glue
) 的情况下运行脚本,一切正常,但日志与 Spark 日志位于同一日志流中,其中包含大量 Java 噪音。

使用 Python 记录器 (

--TYPE=log
) 或打印语句 (
--TYPE=print
) 运行是出现问题的地方。日志行被分组或分割,看起来是任意的,并且从一次运行到另一次运行不一定以相同的方式。这表明该问题与时间相关。使用没有
output
委托,而只是原始打印语句的脚本版本,我能够在单个 CloudWatch 消息中获取每个打印行。

No delay

在每个输出语句之前插入小至 100 毫秒 (

--DELAY=100
) 的轻微延迟,分组和拆分问题就消失了。

100 ms delay

不幸的是,使用 Glue 记录器并不是一个好的解决方案,因为我的真实脚本中有使用 Python 记录器和原始打印语句的遗留代码,并且更改这些代码会很痛苦。

以前有人在使用 CloudWatch 或 Glue 时遇到过这个问题吗?如果是的话,你是如何在不使用猴子补丁的情况下解决这个问题的

sys.stdout
? 😉

python logging aws-glue amazon-cloudwatchlogs
1个回答
0
投票

因此,我在此向 AWS 支持发布了这个问题。后者承认将 Glue 作业的标准日志发送到 CloudWatch 的方式存在配置错误,但没有提供解决方法。

由于

GlueContext
提供的记录器实际上是 Java 记录器的包装器,因此它不能与正常的日志记录语义一起使用。它还充斥着来自 Spark 的日志消息。

我最终的解决方案是使用 Watchtower 创建普通记录器,并破解 aws-embedded-metrics-python 包以使用记录器。

"""Utility methods an classes for handling logging to Watchtower."""

import logging
import sys

import watchtower
from aws_embedded_metrics.environment.environment_detector import EnvironmentCache
from aws_embedded_metrics.environment.local_environment import LocalEnvironment
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.serializers import Serializer
from aws_embedded_metrics.serializers.log_serializer import LogSerializer
from aws_embedded_metrics.sinks import Sink
from awsglue.utils import getResolvedOptions

logger = logging.getLogger(__name__)


class LoggerSink(Sink):
    """A sink that logs serialized metrics to the logger."""

    def __init__(self, serializer: Serializer = LogSerializer()):
        self.serializer = serializer

    def accept(self, context: MetricsContext) -> None:
        for serialized_content in self.serializer.serialize(context):
            if serialized_content:
                logger.info(serialized_content)

    @staticmethod
    def name() -> str:
        return "LoggerSink"


opts = getResolvedOptions(sys.argv, [])

watchtower_handler = watchtower.CloudWatchLogHandler(
    log_group_name="/aws-glue/jobs/output", 
    log_stream_name=opts["JOB_RUN_ID"],
)

watchtower_handler.setFormatter(
    logging.Formatter(
        fmt="%(asctime)s,%(msecs)d %(levelname)s [%(filename)s:%(lineno)d#%(funcName)s] %(message)s",
        datefmt="%Y-%m-%d:%H:%M:%S",
    )
)

logging.basicConfig(
    handlers=[watchtower_handler],
    datefmt="%Y-%m-%d:%H:%M:%S",
    level=logging.INFO
    force=True,  # Nuke any existing loggers
)

logger_env = LocalEnvironment()
logger_env.sink = LoggerSink()
EnvironmentCache.environment = logger_env

我写入 Glue 将使用的同一日志流。最终结果是,我关心的内容通过 Watchtower 记录器路由,而遗留代码中标准输出的内容则由 Glue 默认捕获 stdout 和 stderr 流来处理。

这意味着至少正确记录的数据和指标不会混乱。不是最好的解决方案。

© www.soinside.com 2019 - 2025. All rights reserved.