我正在尝试在 python 中设置数据流束流管道,其中源是 kafka,接收器是 postgres 表(请参阅下面的管道代码)。 Kafka 主题有多个分区和多个代理。当我运行这个管道时,我观察到工人的记忆持续增加。
我与两名工作人员一起运行,每个工作人员都使用 n2d-standard-64 机器。来自kafka的消息流入量相当高。我的假设是管道没有以这样的速度处理消息。由于许多消息(未处理的消息)在内存中保留的时间较长,worker 内存利用率不断增加,最终导致 OOM 错误。
我尝试了一些不同的方法来控制内存使用:
上述解决方案都无助于控制工人记忆的增长。有人可以建议一个解决方案来控制下面的数据流光束管道中的 Kafka 流入吗?
在 Beam 管道中将消息写入 postgres 后是否可以手动提交偏移量?
with beam.Pipeline(options=options) as pipeline:
kafka_config = {
'bootstrap.servers': ','.join(KAFKA_BOOTSTRAP_SERVERS),
'group.id': 'my_group_id',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'true',
}
_ = (
pipeline
| 'Kafka Read'
>> ReadFromKafka(
consumer_config=kafka_config,
topics=[KAFKA_TOPIC],
)
| 'filter' >> beam.Map(apply_filter)
| 'Write'
>> beam.ParDo(
PostgresWriter())
)
我最初的假设是错误的。 OOM 错误并不是由于 kafka 的大量流入造成的。相反,这似乎是由于最新的 Beam 版本(2.55.0 及更高版本)造成的。一旦我切换到2.53.0版本,就没有OOM错误了