我有一个业务需求,需要降低 kafka 消费者的消费速度,因为它可能会对其他微服务造成影响。经过一番研究,我得出的解决方案是:
max-poll-records
限制每个 poll
消耗的记录数量,并设置 idleBetweenPolls
来调节每个 poll
执行的时间间隔。注意不要超过max-poll-interval-ms
中规定的时间。
所以我能够将
max-poll-records
设置为属性并将 idleBetweenPolls
设置为容器属性,如下所示:
factory.getContainerProperties().setIdleBetweenPolls(3000)
它工作得很好,但我也遇到了该房产
spring.kafka.listener.idle-between-polls
,并阅读了它的房产描述:Sleep interval between Consumer. poll(Duration) calls
。除了设置此属性未按预期工作之外,不确定我是否缺少某些配置或可能创建了
ConcurrentKafkaListenerContainerFactory
作为一个 bean 并设置它的一些属性会覆盖监听器属性?
有人可以帮忙解释一下吗?
spring.kafka.listener.idle-between-polls
属性在 Spring Boot 中的映射如下:
map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls);
这确实是为了
ConcurrentKafkaListenerContainerFactory
所做的,但前提是:
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
因此,如果您有自己的
kafkaListenerContainerFactory
bean,那么 Spring Boot 自动配置就会退出,一切都在您手中。不过,您可以将 KafkaProperties
注入到您的自定义 bean 中,但这已经是一个不同的故事了。
对于您的用例,您还可以考虑使用
pause
功能:https://docs.spring.io/spring-kafka/reference/kafka/pause-resume.html