Given,
我有一个Flink作业,它从ActiveMQ
源读取并写入mysql数据库-键入标识符。我每隔一秒钟就为此工作启用检查点。我将检查点指向Minio
实例,并验证了检查点是否与jobid
一起使用。我部署的工作是Openshift(位于下面的Kubernetes)-我可以根据需要按&放大/缩小此工作。
问题
[当部署作业(滚动)或由于错误/错误导致作业中断,并且ActiveMQ中有未使用的消息或Flink中有未确认的消息(但已写入数据库)时,作业恢复时(或部署新作业)作业进程已经处理过的消息,导致在数据库中插入了重复的记录。
问题
jobid
每次部署的变化,恢复如何发生?Exactly-Once
)中,我可以编写特定于数据库的(upsert
)查询以更新给定记录是否存在并插入如果没有?JDBC当前仅支持至少一次,这意味着您在恢复时会收到重复的消息。当前有一个草案可以添加对exactly once的支持,该草案可能会在1.11中发布。
检查点是否应帮助作业从其离开的地方恢复过来?
是,但是最后一次成功检查点与恢复之间的时间可能会产生观察到的重复项。我在某个相关的主题上提供了更详细的answer。
(滚动)部署新作业之前,我应该接受检查点吗?
绝对。您实际上应该使用带有保存点的取消。这是更改拓扑的唯一可靠方法。另外,使用保存点取消可避免数据中的任何重复,因为它可以正常关闭作业。
如果作业因错误或群集故障而退出会怎样?
它会自动重启(取决于您的重启设置)。它将使用最新的检查点进行恢复。这肯定会导致重复。
由于Jobid在每个部署上都在不断变化,恢复如何进行?
您通常明确地指向同一检查点目录(在S3上?)。>
由于无法从数据库中获得幂等性,因此高手是实现完全一次处理的唯一方法吗?
目前,我看不到解决方法。应该以1.11更改。