如何修复 kafka.common.errors.TimeoutException:自批量创建以来已过期 1 条记录 xxx 毫秒加上逗留时间

问题描述 投票:0回答:3

我正在使用kafka_2.11-2.1.1 和使用 spring 2.1.0.RELEASE 的生产者。

我在向 Kafka 主题发送消息时使用 Spring,我的生产者生成了很多

TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time

我正在使用下面的kafka生产者设置

acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60

我尝试了很多组合(特别是

batchSize
lingerMs
)但没有任何效果。任何帮助请询问上述场景的设置应该是什么。

再次尝试使用以下配置...但没有运气相同的错误

acks = 1
    batch.size = 15
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class com.spgmi.ca.prescore.partition.CompanyInfoPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120
    retries = 1

第二次跑步:

我尝试了不同的组合,但没有任何效果。 因此我认为这可能是网络、SSL 等问题。 因此,我在运行生产者的同一台机器上安装并运行 Kafka,即在我的本地计算机中。

我尝试再次运行生产者并指向本地 Kafka 主题。 但没有运气同样的问题。

以下是使用的配置参数。

2019-07-02 05:55:36.663  INFO 9224 --- [lt-dispatcher-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 0
    bootstrap.servers = [localhost:9092]
    request.timeout.ms = 60
    retries = 1
    buffer.memory = 33554432
    linger.ms = 0
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null

面临同样的错误: org.apache.kafka.common.errors.TimeoutException:inbound_topic--1 的 1 条记录即将过期:自批量创建以来已过去 69 毫秒,加上逗留时间

也尝试过 批量大小 5 , 10 & 0 linger_ms 0、5、10 等 request_time_out 0、45、60、120、300 等

没有任何效果...同样的错误。

我还应该尝试什么,解决方案是什么?

如何避免生成负密钥

是的,我设置了本地设置并打印带有分区信息的日志,如下所示

2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] c.s.c.p.p.CompanyInfoPartitioner:主题:inbound_topic Key = 597736248- Entropy Cayman Solar Ltd.-null-null-null 分区 = -1 2019-07-03 02:48:28.931 错误 7092 --- [广告 | Producer-1] o.s.k.support.LoggingProducerListener :发送带有 key='597736248- Entropy Cayman Solar Ltd.-null-null-null' 和有效负载='com.spgmi.ca.prescore.model.Company@8b12343 的消息时抛出异常' 到主题 inbound_topic :

org.apache.kafka.common.errors.TimeoutException:inbound_topic 的 1 条记录即将过期 --1:自批量创建以来已经过去了 104 毫秒,加上逗留时间

我的主题 inbound_topic 有两个分区,如下所示 C:\Software\kafka\kafka_2.11-2.1.1 in\windows>kafka-topics.bat --describe --zookeeper localhost:2181 --topic inbound_topic 主题:inbound_topic PartitionCount:2 ReplicationFactor:1 配置: 主题:inbound_topic 分区:0 领导者:0 副本:0 Isr:0 主题:inbound_topic 分区:1 领导者:0 副本:0 Isr:0

但是我的制作人似乎试图发送到 Partition = -1。

我的分区逻辑如下

int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions;
        logger.info("Topic : "+ topic + "\t Key = " + (String)key + " Partition = " + p );

关键是我正在执行 hashCode()。这里需要纠正什么以避免这个负数分区号?即分区 = -1

我的分区键逻辑应该是什么样的?

非常感谢任何帮助。

apache-kafka kafka-producer-api spring-kafka
3个回答
13
投票

该错误表明某些记录放入队列的速度比从客户端发送的速度快。

当您的 Producer 发送消息时,它们会存储在缓冲区中(在将它们发送到目标代理之前),并且记录会分组在一起以提高吞吐量。当新记录添加到批次中时,必须在可配置的时间窗口内发送,该时间窗口由

request.timeout.ms
控制(默认设置为 30 秒)。如果批次在队列中的时间较长,则会抛出
TimeoutException
,然后批次记录将从队列中删除,并且不会传递给代理。

增加

request.timeout.ms
的值应该可以解决你的问题。


如果这不起作用,您还可以尝试减少

batch.size
,以便更频繁地发送批次(但这次将包含更少的消息),并确保
linger.ms
设置为0(这是默认值) .

请注意,更改任何配置参数后,您需要重新启动 kafka 代理。

如果您仍然收到错误消息,我认为您的网络出现问题。您启用了 SSL 吗?


1
投票

我通过返回有效的分区号修复了之前的问题。


0
投票

也许你必须检查“ProducerListener”,你实现了这个类吗? 我和你有同样的问题,你可以优化ProducerListener实现,运行慢会影响你的发送结果; 示例:

@覆盖 公共无效onSuccess(ProducerRecord ProducerRecord,RecordMetadata recordMetadata){ // db: 更新表,应该删除它,或者usd内存操作 }

© www.soinside.com 2019 - 2024. All rights reserved.