我想对异常使用DLQ
这里是application.yml主题创建成功,但是我的DLQ主题中没有异常消息
spring:
cloud:
stream:
default:
consumer:
useNativeEncoding: true
kafka:
binder:
brokers:
- localhost:9092
consumer-properties:
key.deserializer : org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
enable.auto.commit: true
bindings:
resourceInventoryInput:
consumer:
autoCommitOffset: true
autoCommitOnError: true
enableDlq: true
dlqName: dead-out
dlqProducerProperties:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bindings:
resourceInventoryInput:
binder: kafka
destination: ${application.messaging.topic}
content-type: application/*+avro
group: ${application.messaging.group}
default-binder: kafka
我已经回答过你on GitHub。
我刚刚使用Boot 2.1.15和Greenwich.SR6(以及Boot 2.2.8 / Hoxton.SR5)测试了Yaml,它工作正常。我所做的唯一更改是更改了绑定名称,以输入并注释掉avro内容。
@SpringBootApplication
@EnableBinding(Sink.class)
public class Kbgh9181Application {
public static void main(String[] args) {
SpringApplication.run(Kbgh9181Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
throw new RuntimeException("foo");
}
@KafkaListener(id = "kbgh918", topics = "dead-out", properties = "auto.offset.reset:earliest")
public void listen(Message<?> in) {
System.out.println(in);
}
}
GenericMessage [有效载荷=字节[3],标头= {x-原始偏移量= [B @ 67917b81,x-原始分区== [B @ 467895cd],kafka_timestampType = CREATE_TIME,kafka_receivedMessageKey = null,kafka_receivedTopic =已过期, kafka_offset = 5,x-exception-message = [B @ 51def01e,x-exception-fqcn = [B @ 531d42e5,kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @ 3fbc6674,x-original-topic = [B @ 3d684ab3,x-原始时间戳类型= [B @ 1b101300,kafka_receivedPartitionId = 0,x-原始时间戳== [B @ 222370ed,kafka_receivedTimestamp = 1592402977606,x-exception-stacktrace = [B @ 7e703d1b}]