处理 Kafka 流中的异常是类似的问题,但接受的答案仅涉及生产异常。如何处理处理过程中出现的异常,从而控制手动偏移提交。
您需要手动处理它们,即使用自己的
try-catch
块并对它们做出相应的反应。
您可能会发现该库的使用很有帮助用于 Kafka Streams 中错误处理的库它提供了各种包装器和 DLQ 处理。我自己用它来在流处理器中处理消息时捕获异常。我见过的唯一差距是当您想要跨拓扑中的多个操作处理案例时。
您想要创建一个类并实现
StreamsUncaughtExceptionHandler
在 handle
消息上,您可以添加业务代码(记录、发送到 DLQ 等),然后返回您想要的行为的代码,例如 SHUTDOWN_APPLICATION
、 SHUTDOWN_CLIENT
或 ...
最后将自定义异常添加到流中,执行以下操作:
val streams = KafkaStreams(topology, props)
streams.setUncaughtExceptionHandler(YourCustomExceptionHandler())
这是参考文献
https://developer.confluent.io/courses/kafka-streams/hands-on-error-handling/