使用 gcp 数据流并以 Kafka 作为源运行光束流管道时出现内存不足问题

问题描述 投票:0回答:1

我正在尝试在 python 中设置数据流束流管道,其中源是 kafka,接收器是 postgres 表(请参阅下面的管道代码)。 Kafka 主题有多个分区和多个代理。当我运行这个管道时,我观察到工人的记忆持续增加。

我与两名工作人员一起运行,每个工作人员都使用 n2d-standard-64 机器。来自kafka的消息流入量相当高。我的假设是管道没有以这样的速度处理消息。由于许多消息(未处理的消息)在内存中保留的时间较长,worker 内存利用率不断增加,最终导致 OOM 错误。

我尝试了一些不同的方法来控制内存使用:

  • 限制sdk进程数量为1
  • 将线束线数限制为 1
  • 增加计算大小(使用 highmem 机器)
  • 通过配置kafka消费者参数(max.poll.records、fetch.max.bytes、max.partition.fetch.bytes、fetch.min.bytes、poll.timeout.ms)来限制kafka流入

上述解决方案都无助于控制工人记忆的增长。有人可以建议一个解决方案来控制下面的数据流光束管道中的 Kafka 流入吗?

在 Beam 管道中将消息写入 postgres 后是否可以手动提交偏移量?

with beam.Pipeline(options=options) as pipeline:

    kafka_config = {
        'bootstrap.servers': ','.join(KAFKA_BOOTSTRAP_SERVERS),
        'group.id': 'my_group_id',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': 'true',
    }

    _ = (
        pipeline
        | 'Kafka Read'
        >> ReadFromKafka(
            consumer_config=kafka_config,
            topics=[KAFKA_TOPIC],
        )
        | 'filter' >> beam.Map(apply_filter)
        | 'Write'           
        >> beam.ParDo(
            PostgresWriter())
    )

enter image description here

python apache-kafka google-cloud-dataflow apache-beam
1个回答
0
投票

我最初的假设是错误的。 OOM 错误并不是由于 kafka 的大量流入造成的。相反,这似乎是由于最新的 Beam 版本(2.55.0 及更高版本)造成的。一旦我切换到2.53.0版本,就没有OOM错误了

© www.soinside.com 2019 - 2024. All rights reserved.