spring-cloud-stream-binder-kafka-streams 当消费者发生异常时,消费者停止并进入 EMPTY 状态。我想测试重试机制,但它没有按预期工作。 (https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling)。没有消息发送到 dlq。有什么想法吗?
springBootVersion=3.2.4
springDependencyManagerVersion=1.1.4
springCloudVersion=2023.0.0
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
@Slf4j
@Configuration
@RequiredArgsConstructor
public class CashbackBoosterProcessor {
@Bean
public Function<KStream<String, String>, KStream<String, String>> processPosCashbackBooster() {
return event -> event
.mapValues(v -> {
if (true) {
throw new IllegalArgumentException("Testing of retry mechanism");
}
return v;
});
}
}
spring:
cloud:
function:
definition: processPosCashbackBooster
stream:
bindings:
processPosCashbackBooster-in-0:
destination: input-topic
consumer:
concurrency: 2
max-attempts: 3
processPosCashbackBooster-out-0:
destination: output-topic
kafka:
streams:
bindings:
processPosCashbackBooster-in-0:
consumer:
enable-dlq: true
dlq-name: input-dlq
configuration:
application.id: ${spring.application.name}-input-group
binder:
deserialization-exception-handler: sendtodlq
org.apache.kafka.streams.KafkaStreams : stream-client [demo-8ed02931-b9d7-4743-a741-627001eb74de] State transition from RUNNING to PENDING_ERROR
o.a.k.s.p.internals.StreamThread : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
o.a.k.s.p.internals.StreamThread : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] Shutting down unclean
o.a.k.s.p.internals.StreamThread : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] Informed to shut down
org.apache.kafka.streams.KafkaStreams : stream-client [demo-8ed02931-b9d7-4743-a741-627001eb74de] Shutting down 1 stream threads
o.a.k.s.processor.internals.StreamTask : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] task [0_0] Suspended from RUNNING
o.a.k.s.p.internals.RecordCollectorImpl : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] stream-task [0_0] Closing record collector dirty
o.a.k.s.processor.internals.StreamTask : stream-thread [demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1] task [0_0] Closed dirty
o.a.k.clients.producer.KafkaProducer : [Producer clientId=demo-8ed02931-b9d7-4743-a741-627001eb74de-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.