我将 Kafka 与 Spring-boot 一起使用:
Kafka 生产者类:
@Service
public class MyKafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);
// Send Message
public void sendMessage(String topicName, String message) throws Exception {
LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
}
});
}
}
Kafka 配置:
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093
假设我已在主题
my-test-topic
中发送了 10 次 1000 条消息。
十分之八我成功地获取了消费者中的所有消息,但有时我会收到以下信息错误:
2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] Exception thrown when sending a message with key='null' and payload='{"deviceType":"X","deviceKeys":[{"apiKey":"X-X-o"}],"devices...' to topic my-test-topic
和
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-test-topic-4 due to 30024 ms has passed since batch creation plus linger time
有3种可能:
request.timeout.ms
- 这是 Kafka 等待整个批次在缓冲区中准备就绪的时间。因此,在您的情况下,如果缓冲区中的消息少于 100 000 条,则会发生超时。更多信息在这里:https://stackoverflow.com/a/34794261/2707179batch-size
- 与上一点相关,它将更频繁地发送批次,但它们将包含更少的消息。就我而言,我有
replication factor < min.insync.replicas
。写入永远无法获得足够的确认,因此超时。
使用 replication factor > min.insync.replicas
重新创建主题修复了该问题。
错误中的第一个线索是
30024 ms has passed
- 配置spring.kafka.producer.request.timeout.ms=30000
是相关的。这 30 秒的等待是为了填满 Producer 端的缓冲区。
当消息发布时,它会在生产者端进行缓冲,并等待 30 秒(参见上面的配置)来填满。
spring.kafka.producer.batch-size=100000
表示 100KB,因此如果消息摄取负载较低,并且缓冲区在 30 秒内没有被更多消息填满至 100KB,您会期望收到此消息。
spring.kafka.producer.linger.ms=10
用于摄取负载较高并且生产者希望限制对 Kafka 代理的 send()
调用。这是在批次准备好后(即在缓冲区填充到批次大小为 100KB 后)生产者在向代理发送消息之前等待的持续时间。
解决方案:
linger.ms
可在批次准备就绪后将消息保留更长时间。如果需要更多时间来填充批次,请增加 request.timeout.ms
。batch-size
,或增加 request.timeout.ms
,或两者兼而有之。一个可能的原因是Kafka服务器达到了最大堆大小限制(-Xmx)。在高负载下,当垃圾收集器运行时,可能会导致服务器需要更长的时间来处理请求。 就我而言,在增加 Kafka 的最大内存限制后,错误得到解决。
我通过使用 DNS 正确寻址主机 spring.kafka.bootstrap-servers 解决了这个问题。即使网络解析了IP地址,似乎也需要DNS。