重新启动 SpannerIO 的 ChangeStream 到 GCS (TEXT/JSON) 管道出现错误

问题描述 投票:0回答:1

我已经设置了此管道:https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage,作者:

  • 将 startTime 硬编码为特定时间(不使用 timestamp.now())。
  • 指定元数据表名称(因此不使用自动生成的表名称)

第一次运行进展顺利。它以 json 格式将 DataChangeRecord 生成到我的 gcs 存储桶中。 但是,当我停止它时,延迟一段时间(以模拟我需要更新管道的时间),然后再次启动它,我收到此错误,并且管道根本没有写入任何 json/文本文件我的GC。

来自工作人员的错误消息:generic::invalid_argument:SDK 报告的水印较小:2024-09-25T17:32:17.342+00:00 比计算的下限:2024-09-26T01:43:06.072+00:00

起初我认为我需要在允许延迟的情况下实现窗口,所以我通过在将其交给 TextIO.write 之前添加窗口来实现它

 // apply windowing
      var window = Window.<DataChangeRecord>into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration())))
          .withAllowedLateness(Duration.standardHours(24))
          .triggering(AfterWatermark.pastEndOfWindow()
              .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                  .plusDelayOf(Duration.standardMinutes(1)))
              .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                  .plusDelayOf(Duration.standardMinutes(1))))
          .discardingFiredPanes();
  var windowedChangeStreamRecords = dataChangeRecords
      .apply("Windowing", window);

但是,问题仍然存在。知道有什么问题吗?

对于上下文:我正在尝试构建一个简单的摄取过程,将 Spanner 数据摄取到 GCS 中(用于分析目的)。

java google-cloud-dataflow apache-beam google-cloud-spanner
1个回答
0
投票

这看起来可能是扳手连接器的一个错误;它似乎报告的水印小于之前计算的水印,这是不应该发生的 - 如果您使用 Beam 的 spanner io,您介意在这里提交一个错误吗 https://github.com/apache/beam/问题/新?分配者=octocat&标签=bug%2Cawaiting+triage&projects=&template=bug.yml&title=%5BBug%5D%3A+

© www.soinside.com 2019 - 2024. All rights reserved.