我有一个数据流管道,可以从 kafka 读取消息,处理它们,然后将它们插入到 bigquery 中。
我希望处理/bigquery 插入将在基于时间的批次中进行,以便在每个(1 分钟)间隔内,在该间隔内从 kafka 读取的所有消息都将被处理到 bigquery 中。
我试图遵循文档示例并制作了以下管道:
import apache_beam as beam
from apache_beam.transforms import window
beam_options = SetupOptions(beam_args, streaming=True)
with beam.Pipeline(options=beam_options) as pipeline:
# read Kafka events
raw = (
pipeline
| "read kafka events" >> kafkaio.KafkaConsume(consumer_config=kafka_config)
| "extract msg" >> (beam.Map(lambda x: x[1])).with_output_types(str)
)
debug1 = (raw | "debug1" >> beam.Map(lambda x: print(f"debug1: {type(x)}, {x}")))
windows = (raw | 'apply window' >> beam.WindowInto(window.FixedWindows(60)))
debug2 = (windows | "debug2" >> beam.Map(lambda x: print(f"debug2: {type(x)}, {x}")))
但是当我运行它时,我看到两个调试步骤,
debug1
和debug2
,立即一个接一个地运行,表明窗口并没有真正发生。
我想我错过了一些基本的东西。 在文档here中,示例管道有一个
GroupByKey
步骤。也许在使用基于时间的窗口时这是必要的,尽管我真的不需要对我的用例进行分组。
提前致谢
我相信您需要在代码中添加触发功能,例如:
with beam.Pipeline(options=pipeline_options) as pipeline:
raw = (
pipeline
| "read kafka events" >> kafkaio.KafkaConsume(consumer_config=kafka_config)
| "extract msg" >> (beam.Map(lambda x: x[1])).with_output_types(str)
)
debug1 = raw | "debug1" >> beam.Map(lambda x: print(f"debug1: {type(x)}, {x}"))
windows = (
raw
| 'apply window' >> beam.WindowInto(
window.FixedWindows(60),
trigger=beam.trigger.AfterProcessingTime(60),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
)
)
debug2 = windows | "debug2" >> beam.Map(lambda x: print(f"debug2: {type(x)}, {x}"))