Kafka流中处理异常如何处理?

问题描述 投票:0回答:3

处理 Kafka 流中的异常是类似的问题,但接受的答案仅涉及生产异常。如何处理处理过程中出现的异常,从而控制手动偏移提交。

apache-kafka-streams
3个回答
0
投票

您需要手动处理它们,即使用自己的

try-catch
块并对它们做出相应的反应。


0
投票

您可能会发现该库的使用很有帮助用于 Kafka Streams 中错误处理的库它提供了各种包装器和 DLQ 处理。我自己用它来在流处理器中处理消息时捕获异常。我见过的唯一差距是当您想要跨拓扑中的多个操作处理案例时。


0
投票

您想要创建一个类并实现

StreamsUncaughtExceptionHandler
handle
消息上,您可以添加业务代码(记录、发送到 DLQ 等),然后返回您想要的行为的代码,例如
SHUTDOWN_APPLICATION
SHUTDOWN_CLIENT
或 ...

最后将自定义异常添加到流中,执行以下操作:

val streams = KafkaStreams(topology, props)   
streams.setUncaughtExceptionHandler(YourCustomExceptionHandler())

这是参考文献

https://developer.confluent.io/courses/kafka-streams/hands-on-error-handling/

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.