带有Pub / Sub源的Apache Beam Python SDK在运行时停留

问题描述 投票:0回答:1

我正在使用Python SDK在Apache Beam中编写一个程序,从Pub / Sub读取JSON文件的内容,并对接收到的字符串进行一些处理。这是程序中我从Pub / Sub中提取内容并执行处理的部分:

with beam.Pipeline(options=PipelineOptions()) as pipeline:
    lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
    lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))

    lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))

    def json_to_tuple(jsonStr):
        res = json.loads(jsonStr)
        ##printing retutn value
        print (res['id'], res['messageSize'])
        ##
        return (res['id'], res['messageSize'])

    tupled = lines_split | beam.Map(json_to_tuple)

    def printlines(line):
        print line

    result = tupled | beam.CombinePerKey(sum)
    result | beam.Map(printlines)

在运行程序时,代码在创建PCollection tupled之后卡住(之后没有执行任何代码行)。奇怪的是,当我将源文件从Pub / Sub更改为包含完全相同内容的本地文件(使用ReadFromText())时,程序运行正常。这种行为可能是什么原因?

python apache-beam google-cloud-pubsub apache-beam-io
1个回答
1
投票

根据Pub / Sub I / O文档(Apache Beam docsDataflow Pub/Sub I/O docs),默认情况下,PubsubIO转换使用无界PCollections。

PCollections可以是bounded or unbounded

  • 有界:数据来自固定的来源,如文件。
  • 无界:数据来自不断更新的源,例如Pub / Sub订阅。

在对无界PCollection进行操作之前,必须使用以下策略之一:

  • 窗口化:无界PCollections不能直接用于分组变换(例如你正在使用的CombinePerKey),所以你应该首先使用set a non-global windowing function
  • 触发器:你可以通过set up a trigger获得无限制的PCollection,它可以提供无界数据集的定期更新,即使订阅中的数据仍在流动。

这可以解释您所看到的行为,即当从本地文件(有界数据源)读取时相同的管道工作,但是当它从Pub / Sub订阅(这是一个无限制的数据源)读取时不工作。

因此,为了使用Pub / Sub订阅,您应该应用窗口或触发策略,以便可以在以下转换中正确处理PCollections中的数据。

编辑:另外,正如@Arjun所发现的那样,可能需要通过使用以下命令设置适当的arg参数来启用Pipeline中的Streaming:

pipeline_options.view_as(StandardOptions).streaming = True
© www.soinside.com 2019 - 2024. All rights reserved.