将kafka链接到S3,然后重试

问题描述 投票:0回答:1

我是Flink的新手,我需要从Kafka读取数据,使用某些API有条件地充实这些数据(如果记录属于X类),并写入S3。

我用上面的逻辑制作了一个hello world Flink应用程序,它就像一个超级按钮。

但是,我用来丰富的API没有100%的正常运行时间SLA,因此我需要使用重试逻辑进行设计。

以下是我找到的选项,

选项1)进行指数重试,直到我收到API的响应,但这会阻塞队列,所以我不喜欢这样

选项2)再使用一个主题(称为topic-failure),如果API关闭,则将其发布为topic-failure。这样,它不会阻止实际的主队列。我将需要一名工作人员来处理队列主题失败中的数据。同样,如果API长时间关闭,则必须将此队列用作循环队列。例如,从队列主题失败中读取一条消息,如果它无法推送到称为主题失败的同一队列,并尝试使用队列主题失败中的下一条消息,则尝试丰富该消息。

我更喜欢选项2,但要完成此任务似乎并不容易。是否有任何标准的Flink方法可用于实施选项2?

apache-kafka apache-flink flink-streaming
1个回答
0
投票

这是一个从微服务迁移而来的相当普遍的问题。适当的解决方案是将查找数据也存储在Kafka或某些DB中,这些数据库可以作为附加源集成在同一Flink应用程序中。

如果您不能执行此操作(例如,API是外部的,或者无法轻松将数据映射到数据存储),则这两种方法都是可行的,并且它们具有不同的优势。

1)将允许您保留输入事件的顺序。如果您的下游应用程序期望有序,那么您需要重试。

2)常用术语是死信队列(尽管更经常用于无效记录)。有两种简单的方法可以将其集成到Flink中,要么有单独的来源,要么将主题模式/列表与一个来源一起使用。

您的拓扑如下所示:

Kafka Source      -\             Async IO        /-> Filter good -> S3 sink
                    +-> Union -> with timeout  -+ 
Kafka Source dead -/           (for API call!)   \-> Filter bad  -> Kafka sink dead
© www.soinside.com 2019 - 2024. All rights reserved.