鉴于以下同步 kafka 生产者
Properties props = new Properties();
props.put("max.block.ms", 30000);
props.put("request.timeout.ms", 30000);
props.put("retries", 5);
KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);
//Send message
producer.send(producerRecord).get();
帮助我理解 request.timeout.ms 和 max.block.ms 生产者配置之间的区别。是否包括所有重试的最长时间?或者每次重试都有自己的超时时间?
request.timeout.ms 用于超时请求,我会将其设置为我可以等待响应的最长时间。
max.block.ms 用于生产者阻塞缓冲时间、序列化时间等。
详情请看这个。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
我发现接受的答案有点“薄”,所以这可能会帮助其他人。
您的代码中有两件重要的事情:
KafkaProducer<String, byte[]> produce = new KafkaProducer<>(props);
//Send message
producer.send(producerRecord).get();
producer.send(producerRecord)
- 由两部分组成:阻塞和非阻塞。阻挡部分是由一些“零件”组成的:
- Request metadata about brokers from zookeeper
- Serialize message
- Choose a Partition
- Place Message in the RecordAccumulator
现在,通常前三个步骤很快(第一个步骤在初始调用后缓存),而第四个步骤可能需要时间。发生这种情况是因为
RecordAccumulator
的空间有限 (buffer.memory
),而您目前有多少空间取决于生产者客户端中的 other 线程(称为 Sender Thread
)。如果这个线程做得不好(从 RecordAccumulator
检索消息并将其发送到代理;顺便说一句,所有这些都有指标),您的 send
方法将被阻塞(RecordAccumulator
中没有空间)直到有可用空间。
所有这 4 个步骤都允许被阻止最多
max.block.ms
。这就是 KIP 在谈论时的含义:
还有
delivery.timeout.ms
。这是消息发送到分区之前等待的总时间,包括:将记录推送到批次的时间(在 RecordAccumulator
中)+ 获取 ack 的时间(例如 all
并等待消息发送)跨副本复制)+ 将消息发送到代理的时间,包括所有重试(如果有)。
您可以将其视为从
send
方法到达所有副本并发回 ack 所需的时间。所有这段时间都必须低于于delivery.timeout.ms
。
在解释
request.timeout.ms
之前,恕我直言,了解 max.in.flight.requests.per.connection
是什么很重要,因为它们有一点联系。假设一批已准备好从 RecordAccumulator
发送到代理(因为其 batch.size
或 linger.ms
已完成)。该批次是否由所谓的“发送者线程”(客户端本身的线程,并且是!=调用 send
方法的线程)获取或不由 max.in.flight.requests.per.connection
定义。
您可以在任何时间点有多达
max.in.flight.requests.per.connection
并发 请求处于活动状态。一个稍微容易一点的思考方式是这样的。 “发送者线程”有一个它不断执行的特定循环,以伪代码表示:
while(true) {
// check if there are batches to send
// get the batches to send to the brokers
// make requests to the broker
// poll connections
// etc
}
假设这是第一批发送的。 “发送者线程”递增
max.in.flight.requests.per.connection
,使其变为1;获取批次并将其发送给经纪人。此时它“不会”等待确认,而是返回到循环。依此类推,直到达到 5(max.in.flight.requests.per.connection
的默认值)。现在假设有批次要发送到代理,发送者线程不会接受,因为它没有可用的请求(我们现在最多 5 个)。相反,它会“轮询连接”:它会向代理询问之前发送的结果,其余的解释在这里有了所有这些背景,是时候看看request.timeout.ms
了,现在实际上很容易了。当客户端轮询连接时 - 尝试从代理获取每个正在进行的请求的响应,它可以在
request.timeout.ms
(默认情况下为 30 秒)内完成此操作。如果我们重试,该值将被重置。当我尝试向已关闭的代理/服务器发送请求时 (max.block.size = 60000,request.timeout.ms = 1000,delivery.timeout.ms = 2000,重试= 2,retry.backoff.ms = 100)
try {
kafkaTemplate.send(producerRecord).get();
}catch (Exception e){
LOGGER.error("Error while sending request: {}", e);
}
抛出异常:
发送失败;嵌套异常是 org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 DUMMY-TOPIC。幕后是否发生任何重试?
超时是否应该更早出现?它应该花费不到 7 秒 -> 交付。超时 2 秒 3 次,退避时间仅为 100 毫秒