我正在编写一个 spring-boot 应用程序,接收请求并进行一些数据处理,然后将数据写入 kafka。
当我进行压力测试时,我发现kafka消息传递非常慢。
数据处理逻辑是这样的:
private KafkaTemplate<String, Object> kafkaTemplate;
List<Map<String,String>> receive_data = parse(httpRequest);
List<Map<String,String>> processed_data = process(receive_data);
processed_data foreach {
kafkaTemplate.send();
}
当我检查jstack时,我发现大多数线程堆栈都是这样的
"qtp1206051975-94" #94 prio=5 os_prio=0 tid=0x00007fe054034800 nid=0x34c waiting on condition [0x00007fdfeb4f2000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000788869f80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
at org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:143)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:218)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:942)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
我的 Kakfa Producer 配置是
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.ZSTD.name);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
我已将
buffer.memory
从默认的32M更改为64M或128M,但仍然得到相同的结果。
TPS没有增加。
我的 Spring 应用程序使用
jetty
作为 Web 服务器
我改变了码头线程
server:
jetty:
threads:
max: 400
min: 20
有人可以帮我解决这个问题吗,谢谢。
我想提高应用程序的TPS,现在它看起来被kafka消息传递速度卡住了。
batch.size
很小,增大它或者使用默认值,更大的linger.ms
,这样在阻塞时会花时间批量处理更多记录并一次发布。
增加
max.in.flight.requests.per.connection
会增加单个连接中的请求数量。
检查
min.insync.replicas
值,2 应该适合 2 或以上的复制因子。使用 acks all 后,它将等待所有副本的确认。
值根据消息大小和所需吞吐量而变化,我将从默认值
batch.size
、linger.ms=100
和 min.insync.replicas=2
开始