我需要一个简单的任务来计算来自无限数据源的固定窗口中的消息数。
步骤是:
作为数据源,我使用公共发布/订阅主题
projects/pubsub-public-data/topics/taxirides-realtime
topic_name = "projects/pubsub-public-data/topics/taxirides-realtime"
options = pipeline_options.PipelineOptions()
options.view_as(pipeline_options.StandardOptions).streaming = True
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
ib.options.recording_duration = '18s'
class dataAsKey(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
yield (format(window.start.to_utc_datetime()) , 1)
p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)
data = (p
| "Read" >> beam.io.ReadFromPubSub(topic=topic_name)
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(6))
)
ib.show(data)
如上图所示,Interactive Beams 收到了 110 条消息。 但是,当我需要使用 Window Timestamp 作为键转换 PCollection 然后计算键的消息数时,消息数与聚合后的总数不匹配。在下面的示例中,每个键的消息总数为 50.
count = data(
| 'Data as key' >> beam.ParDo(dataAsKey())
| 'Count per Window' >> Count.PerKey()
ib.show(count)
奇怪的是,当我将相同的代码与有界数据源一起使用时,值匹配。