我有几个关于 KafkaProducer 客户端的异步性质的问题,官方文档没有回答这些问题。
背景:
send()
的应用程序线程不会等待所有分区的确认。我尝试过的:
send()
是异步的,不会阻塞调用线程,直到记录保存在分区中.问题:
Future.get()
之类的阻塞操作。send()
函数和结束时间
是在返回 RecordMetadata 并在调用线程的 doOnSuccess
或 doOnError
块中调用指标发布函数之后。基于这个设计,我假设无论发布什么时间都是 执行这一系列步骤所花费的时间(即计算生产者 元数据、计算分区、序列化等)直到消息 落在内部缓冲区中,而不是消息到达所花费的时间 写入分区本身
此说法不属实。根据
ack
配置文档:
acks=0 如果设置为零,那么生产者将不会等待任何 来自服务器的确认。记录将立即 添加到套接字缓冲区并视为已发送。
所以回调会在记录放入socket后通知,而不是内部缓冲区
此外,即使返回 Future,也不能保证
send()
方法完全非阻塞。您可以在KafkaProducer.doSend()
代码中找到此代码:
accumulator.append(...,剩余WaitMs,...);
因此,如果内部缓冲区已满,
send()
实际上可能会阻塞。例如,如果您生成消息的速度比网络线程将消息发送到代理的速度快(即使不等待任何确认),则可能会发生这种情况