带有时基窗口的 Apache 波束流处理过程

问题描述 投票:0回答:1

我有一个数据流管道,可以从 kafka 读取消息,处理它们,然后将它们插入到 bigquery 中。
我希望处理/bigquery 插入将在基于时间的批次中进行,以便在每个(1 分钟)间隔内,在该间隔内从 kafka 读取的所有消息都将被处理到 bigquery 中。

我试图遵循文档示例并制作了以下管道:

import apache_beam as beam
from apache_beam.transforms import window

beam_options = SetupOptions(beam_args, streaming=True)
with beam.Pipeline(options=beam_options) as pipeline:

    # read Kafka events
    raw = (
        pipeline
        | "read kafka events" >> kafkaio.KafkaConsume(consumer_config=kafka_config)
        | "extract msg" >> (beam.Map(lambda x: x[1])).with_output_types(str)
    )

    debug1 = (raw | "debug1" >> beam.Map(lambda x: print(f"debug1: {type(x)}, {x}")))

    windows = (raw | 'apply window' >> beam.WindowInto(window.FixedWindows(60)))

    debug2 = (windows | "debug2" >> beam.Map(lambda x: print(f"debug2: {type(x)}, {x}")))

但是当我运行它时,我看到两个调试步骤,

debug1
debug2
,立即一个接一个地运行,表明窗口并没有真正发生。

我想我错过了一些基本的东西。 在文档here中,示例管道有一个

GroupByKey
步骤。也许在使用基于时间的窗口时这是必要的,尽管我真的不需要对我的用例进行分组。

提前致谢

python google-cloud-dataflow apache-beam
1个回答
0
投票

我相信您需要在代码中添加触发功能,例如:

with beam.Pipeline(options=pipeline_options) as pipeline:
    raw = (
        pipeline
        | "read kafka events" >> kafkaio.KafkaConsume(consumer_config=kafka_config)
        | "extract msg" >> (beam.Map(lambda x: x[1])).with_output_types(str)
    )

    debug1 = raw | "debug1" >> beam.Map(lambda x: print(f"debug1: {type(x)}, {x}"))

    windows = (
        raw
        | 'apply window' >> beam.WindowInto(
            window.FixedWindows(60),
            trigger=beam.trigger.AfterProcessingTime(60),  
            accumulation_mode=beam.trigger.AccumulationMode.DISCARDING  
        )
    )

    debug2 = windows | "debug2" >> beam.Map(lambda x: print(f"debug2: {type(x)}, {x}"))
© www.soinside.com 2019 - 2024. All rights reserved.