在从卡夫卡摄入消息弗林克流应用程序,1)如何禁用自动提交? 2)如何手动从弗林克成功处理邮件后提交?
谢谢。
通过default弗林克承诺在检查点偏移。可以按如下方式禁用它:
val consumer = new FlinkKafkaConsumer011[T](...)
c.setCommitOffsetsOnCheckpoints(false)
如果你没有启用检查点看到here
为什么这样做,虽然?弗林克的检查点机制是有没有办法解决这个问题你。弗林克不会在出现故障的情况下提交的偏移。如果你在某些时候抛出一个异常,卡夫卡消费者弗林克的下游将尝试重新启动先前成功的检查点流。如果再错误仍然存在弗林克将为次配置的一些失败的流之前需要反复重启。这意味着,您可能不会丢失消息,由于弗林克提交你的代码没有成功处理的消息偏移。