我正在使用 Kafka 2.8.1 (AWS MSK)。每当 AWS 完成一些滚动升级时,我都会观察到我的 Kafka 流应用程序达到错误状态,并且在尝试查询状态存储时不断收到以下异常
java.lang.IllegalStateException: Error when retrieving state store.
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$3(InteractiveQueryService.java:237)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)
at org.springframework.cloud.stream.binder.kafka.streams.**InteractiveQueryService.getHostInfo(InteractiveQueryService.java:222)**
at com.sp.gos.processors.GossiperKVStoreQueryService.getHostInfo(GossiperKVStoreQueryService.java:108)
at com.sp.gos.processors.GossiperKVStoreQueryService.get(GossiperKVStoreQueryService.java:68)
...
caused by: java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
at org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.HashMap$KeySpliterator.tryAdvance(HashMap.java:1728)
at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$3(InteractiveQueryService.java:228)
我还在属性中添加了一个日志并继续异常处理程序,如下所示,每当我遇到此问题时,我都看不到代码到达那里
props.put(
"default.deserialization.exception.handler",
LogAndContinueExceptionHandler.class);
props.put(
"default.production.exception.handler",
LogAndContinueProductionExceptionHandler.class);
有人可以告诉我出了什么问题吗?我的配置是否有任何错误导致了此问题?一旦我重新启动我的应用程序,一切就开始正常工作。
org.springframework.kafka:spring-kafka:jar
- 3.0.9org.apache.kafka:kafka-clients
- 3.5.1org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
- 4.0.4org.apache.kafka:kafka-streams
- 3.5.1虽然我没有使用异常处理程序,但为什么你期望你的处理程序类被调用。
您看到流(消费者)异常 - 因此不会调用生产处理程序。
同样,异常与数据的反序列化无关。