Apache Flink-作业部署期间重复的消息处理,以ActiveMQ为源

问题描述 投票:1回答:1

Given,

我有一个Flink作业,它从ActiveMQ源读取并写入mysql数据库-键入标识符。我每隔一秒钟就为此工作启用检查点。我将检查点指向Minio实例,并验证了检查点是否与jobid一起使用。我部署的工作是Openshift(位于下面的Kubernetes)-我可以根据需要按&放大/缩小此工作。

问题

[当部署作业(滚动)或由于错误/错误导致作业中断,并且ActiveMQ中有未使用的消息或Flink中有未确认的消息(但已写入数据库)时,作业恢复时(或部署新作业)作业进程已经处理过的消息,导致在数据库中插入了重复的记录。

问题

  • 检查点是否应帮助作业从其离开的地方恢复过来?
  • (滚动)部署新作业之前,我应该接受检查点吗?
  • 如果作业因错误或群集故障而退出会怎样?
  • 随着jobid每次部署的变化,恢复如何发生?
  • Edit由于我不能期望数据库具有幂等性,为避免重复项保存到数据库(Exactly-Once)中,我可以编写特定于数据库的(upsert)查询以更新给定记录是否存在并插入如果没有?
kubernetes apache-flink flink-streaming flink-cep flink-sql
1个回答
0
投票

JDBC当前仅支持至少一次,这意味着您在恢复时会收到重复的消息。当前有一个草案可以添加对exactly once的支持,该草案可能会在1.11中发布。

检查点是否应帮助作业从其离开的地方恢复过来?

是,但是最后一次成功检查点与恢复之间的时间可能会产生观察到的重复项。我在某个相关的主题上提供了更详细的answer

(滚动)部署新作业之前,我应该接受检查点吗?

绝对。您实际上应该使用带有保存点的取消。这是更改拓扑的唯一可靠方法。另外,使用保存点取消可避免数据中的任何重复,因为它可以正常关闭作业。

如果作业因错误或群集故障而退出会怎样?

它会自动重启(取决于您的重启设置)。它将使用最新的检查点进行恢复。这肯定会导致重复。

由于Jobid在每个部署上都在不断变化,恢复如何进行?

您通常明确地指向同一检查点目录(在S3上?)。>

由于无法从数据库中获得幂等性,因此高手是实现完全一次处理的唯一方法吗?

目前,我看不到解决方法。应该以1.11更改。

© www.soinside.com 2019 - 2024. All rights reserved.