我有一个发送消息的简单代码
private static void send(KafkaProducer<String, String> producer) {
System.out.println(Thread.currentThread()+" before");
producer.send(new ProducerRecord<String, String>("test", "test"), (metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition());
} else {
exception.printStackTrace();
}
});
System.out.println(Thread.currentThread()+" after");
}
和kafka配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
我的kafka关机了
我预计如果我调用发送两次
send(producer);
send(producer);
,我会得到两条
Thread before
一条接一条的记录,因为我们是异步模式
Thread before
Thread before
但是我看到只有在第一条记录出错后才开始处理第二条记录
Thread[main,5,main] before
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
Thread[main,5,main] after
Thread[main,5,main] before
我好像误解了kafka的异步模式
第一个发送请求被阻塞以创建主题。
此外,您的
after
应该在lamdba回调体内。