Spring-Kafka 生产者消息太慢

问题描述 投票:0回答:1

我正在编写一个 spring-boot 应用程序,接收请求并进行一些数据处理,然后将数据写入 kafka。

当我进行压力测试时,我发现kafka消息传递非常慢。

数据处理逻辑是这样的:


private KafkaTemplate<String, Object> kafkaTemplate;


List<Map<String,String>> receive_data = parse(httpRequest);

List<Map<String,String>> processed_data = process(receive_data);

processed_data foreach {
  kafkaTemplate.send();
}

当我检查jstack时,我发现大多数线程堆栈都是这样的

"qtp1206051975-94" #94 prio=5 os_prio=0 tid=0x00007fe054034800 nid=0x34c waiting on condition [0x00007fdfeb4f2000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000788869f80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:143)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:218)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:942)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)

我的 Kakfa Producer 配置是

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.ZSTD.name);
    props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

我已将

buffer.memory
从默认的32M更改为64M或128M,但仍然得到相同的结果。 TPS没有增加。

我的 Spring 应用程序使用

jetty
作为 Web 服务器 我改变了码头线程

server:
  jetty:
    threads:
      max: 400
      min: 20

有人可以帮我解决这个问题吗,谢谢。

我想提高应用程序的TPS,现在它看起来被kafka消息传递速度卡住了。

java apache-kafka spring-kafka
1个回答
0
投票

batch.size
很小,增大它或者使用默认值,更大的
linger.ms
,这样在阻塞时会花时间批量处理更多记录并一次发布。

增加

max.in.flight.requests.per.connection
会增加单个连接中的请求数量。

检查

min.insync.replicas
值,2 应该适合 2 或以上的复制因子。使用 acks all 后,它将等待所有副本的确认。

值根据消息大小和所需吞吐量而变化,我将从默认值

batch.size
linger.ms=100
min.insync.replicas=2

开始
© www.soinside.com 2019 - 2024. All rights reserved.