如何使用批处理从DataFlow中的PubSub读取

问题描述 投票:0回答:2

在Pubsub源代码的SDK 1.9.1中,有PubsubIO.Read.maxReadTimePubsubIO.Read.maxNumRecords方法可用。这些方法允许从pubsub消息创建有界集合,可以以批处理模式启动Dataflow管道。

使用Dataflow SDK 2.1可以实现类似的功能吗?如何使用批处理模式从Dataflow管道中的Pubsub读取?

google-cloud-platform google-cloud-dataflow google-cloud-pubsub gcp
2个回答
1
投票

您不应尝试在批处理上下文中使用PubsubReader。相反,您应该使用提供的流式PubsubIO,并设置一个窗口策略,如here所述。您可以使用“其他复合触发器”部分(下面复制)中描述的复合触发器来获取所需的行为。

Repeatedly.forever(AfterFirst.of(
      AfterPane.elementCountAtLeast(100),
      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))

-1
投票

不幸的是,我在新版本的SDK中没有看到任何支持。我所做的是实现一个DoFn,它从PubSub读取maxReadTime或maxIMum记录并返回消息。

这就是他们在以前版本的SDK上所做的。你可以查看PubsubReader课程。

你必须这样称呼它:

 pipeline.begin()
            .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
            .apply(ParDo. of(new MyPubsubReader(maxNumRecords, maxReadTime));
            .setCoder(coder);
© www.soinside.com 2019 - 2024. All rights reserved.