我正在尝试使用Apache Beam SDK 2.6.0使用Google Dataflow和PubSub完成一次交付。
用例很简单:
'Generator'数据流作业向PubSub主题发送1M消息。
GenerateSequence
.from(0)
.to(1000000)
.withRate(100000, Duration.standardSeconds(1L));
“归档”数据流作业从PubSub订阅中读取消息并保存到Google云端存储。
pipeline
.apply("Read events",
PubsubIO.readMessagesWithAttributes()
// this is to achieve exactly-once delivery
.withIdAttribute(ATTRIBUTE_ID)
.fromSubscription('subscription')
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
.apply("Window events",
Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.standardMinutes(15))
.discardingFiredPanes())
.apply("Events count metric", ParDo.of(new CountMessagesMetric()))
.apply("Write files to archive",
FileIO.<String, Dto>writeDynamic()
.by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
.to(archiveDir)
.withTempDirectory(archiveDir)
.withNumShards(options.getNumShards())
.withNaming(dataSource ->
new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
));
我将'withIdAttribute'添加到Pubsub.IO.Write('Generator'作业)和PubsubIO.Read('Archive'作业)并期望它将保证完全一次的语义。
我想测试'负面'场景:
实际上我得到的是 - 所有消息都被传递(至少一次实现),但最重要的是有很多重复 - 每1M消息30-50K左右。
是否有任何解决方案可以实现一次性交付?
Dataflow不允许您跨运行保持状态。如果您使用Java,则可以使update a running pipeline不会导致其丢失现有状态,从而允许您跨管道版本进行重复数据删除。
如果这对您不起作用,您可能希望以ATTRIBUTE_ID键入的方式存档邮件,例如。 Spanner或GCS使用此作为文件名。
所以,我自己从来没有这样做,但推理你的问题这就是我接近它的方式......
我的解决方案有点复杂,但我没有找到其他方法来实现这一点,而不涉及其他外部服务。所以,这里什么都没有。
您可以让您的管道从pubsub和GCS读取,然后将它们组合以去除重复数据。这里棘手的部分是一个有界的pCollection(GCS),另一个是无界的(pubsub)。你可以add timestamps到有界集合,然后窗口数据。在此阶段,您可能会丢弃超过约15分钟的GCS数据(先前实施中窗口的持续时间)。这两个步骤(即正确添加时间戳并删除可能不够创建重复项的数据)是迄今为止最棘手的部分。
一旦解决了这个问题,就附加两个pCollections,然后在两个数据集共有的Id上使用GroupByKey。这将产生一个PCollection<KV<Long, Iterable<YOUR_DATUM_TYPE>>
。然后你可以使用一个额外的DoFn,除了生成的Iterable中的第一个元素之外,还删除KV <>装箱。从那以后,您可以像往常一样继续处理数据。
最后,只有在重新启动管道时,第一个pubsub窗口才需要这项额外的工作。之后,您应该将GCS pCollection重新分配给空的pCollection,以便按键分组不会做太多额外的工作。
让我知道你的想法,以及这是否可行。此外,如果您决定采取这种策略,请发布您的里程:)。