Google Dataflow和Pubsub - 无法实现一次性交付

问题描述 投票:1回答:2

我正在尝试使用Apache Beam SDK 2.6.0使用Google Dataflow和PubSub完成一次交付。

用例很简单:

'Generator'数据流作业向PubSub主题发送1M消息。

GenerateSequence
          .from(0)
          .to(1000000)
          .withRate(100000, Duration.standardSeconds(1L));

“归档”数据流作业从PubSub订阅中读取消息并保存到Google云端存储。

pipeline
        .apply("Read events",
            PubsubIO.readMessagesWithAttributes()
                // this is to achieve exactly-once delivery
                .withIdAttribute(ATTRIBUTE_ID)
                .fromSubscription('subscription')
                .withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
        .apply("Window events",
            Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                .withAllowedLateness(Duration.standardMinutes(15))
                .discardingFiredPanes())
        .apply("Events count metric", ParDo.of(new CountMessagesMetric()))
        .apply("Write files to archive",
            FileIO.<String, Dto>writeDynamic()
                .by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
                .to(archiveDir)
                .withTempDirectory(archiveDir)
                .withNumShards(options.getNumShards())
                .withNaming(dataSource ->
                    new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
                ));

我将'withIdAttribute'添加到Pubsub.IO.Write('Generator'作业)和PubsubIO.Read('Archive'作业)并期望它将保证完全一次的语义。

我想测试'负面'场景:

  1. 'Generator'数据流作业向PubSub主题发送1M消息。
  2. 'Archive'数据流作业开始工作,但我在处理点击'停止作业' - >'Drain'的过程中停止它。消息的一部分已经处理并保存到云存储,比方说400K消息。
  3. 我再次启动“归档”作业,并且期望它将采用未处理的消息(600K),最终我将看到保存到存储的1M消息。

实际上我得到的是 - 所有消息都被传递(至少一次实现),但最重要的是有很多重复 - 每1M消息30-50K左右。

是否有任何解决方案可以实现一次性交付?

google-cloud-platform google-cloud-dataflow apache-beam google-cloud-pubsub
2个回答
1
投票

Dataflow不允许您跨运行保持状态。如果您使用Java,则可以使update a running pipeline不会导致其丢失现有状态,从而允许您跨管道版本进行重复数据删除。

如果这对您不起作用,您可能希望以ATTRIBUTE_ID键入的方式存档邮件,例如。 Spanner或GCS使用此作为文件名。


0
投票

所以,我自己从来没有这样做,但推理你的问题这就是我接近它的方式......

我的解决方案有点复杂,但我没有找到其他方法来实现这一点,而不涉及其他外部服务。所以,这里什么都没有。

您可以让您的管道从pubsub和GCS读取,然后将它们组合以去除重复数据。这里棘手的部分是一个有界的pCollection(GCS),另一个是无界的(pubsub)。你可以add timestamps到有界集合,然后窗口数据。在此阶段,您可能会丢弃超过约15分钟的GCS数据(先前实施中窗口的持续时间)。这两个步骤(即正确添加时间戳并删除可能不够创建重复项的数据)是迄今为止最棘手的部分。

一旦解决了这个问题,就附加两个pCollections,然后在两个数据集共有的Id上使用GroupByKey。这将产生一个PCollection<KV<Long, Iterable<YOUR_DATUM_TYPE>>。然后你可以使用一个额外的DoFn,除了生成的Iterable中的第一个元素之外,还删除KV <>装箱。从那以后,您可以像往常一样继续处理数据。

最后,只有在重新启动管道时,第一个pubsub窗口才需要这项额外的工作。之后,您应该将GCS pCollection重新分配给空的pCollection,以便按键分组不会做太多额外的工作。

让我知道你的想法,以及这是否可行。此外,如果您决定采取这种策略,请发布您的里程:)。

© www.soinside.com 2019 - 2024. All rights reserved.