异步模式下kafka producer是如何工作的

问题描述 投票:0回答:1

我有一个发送消息的简单代码

    private static void send(KafkaProducer<String, String> producer) {
        System.out.println(Thread.currentThread()+" before");
        producer.send(new ProducerRecord<String, String>("test", "test"), (metadata, exception) -> {
            if (exception == null) {
                System.out.println(metadata.partition());
            } else {
                exception.printStackTrace();
            }
        });
        System.out.println(Thread.currentThread()+" after");
    }

和kafka配置

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

我的kafka关机了

我预计如果我调用发送两次

        send(producer);
        send(producer);

,我会得到两条

Thread before
一条接一条的记录,因为我们是异步模式

Thread before
Thread before

但是我看到只有在第一条记录出错后才开始处理第二条记录

Thread[main,5,main] before
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
Thread[main,5,main] after
Thread[main,5,main] before

我好像误解了kafka的异步模式

java asynchronous apache-kafka
1个回答
0
投票

第一个发送请求被阻塞以创建主题。

此外,您的

after
应该在lamdba回调体内。

© www.soinside.com 2019 - 2024. All rights reserved.