我有一个 Spring 应用程序,它基本上在 RabbitMQ 中监听一些队列并使用消息并将它们存储在数据库中。
队列的发布速率是每秒 200 条消息,我需要有多个消费者收听并同时存储它们。
我在这样的代码中定义了我的监听器:
@Service
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = "my-exchange", type = "topic"),
value = @Queue("my-queue"),
key = "my-key"),
concurrency = "13"
)
@Slf4j
public class MyService {
@RabbitHandler
public void handle(byte[] data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTAG) throws IOException {
// my code
}
}
我还在 YAML 文件中定义了这个配置:
spring:
task:
execution:
pool:
core-size: 40
max-size: 60
queue-capacity: 1000
rabbitmq:
addresses: ${AMQ_HOST}
username: ${SECRETS_AMQUSER}
password: ${SECRETS_AMQPASS}
virtual-host: my-vhost
listener:
simple:
prefetch: 100
但是当我在 Kubernetes 中运行程序并从 Prometheus 获取线程状态时,我得到了这张图:
红线是runnable threads,紫线是timed-waiting,黄线是waiting。 如图所示,我的可运行线程数(红线)不超过 8 个,这不是我需要的。
如果你能帮助我增加可运行线程的数量,我将非常感激。
我试过这个java环境:
-XX:ThreadStackSize=256 -XX:ParallelGCThreads=16
.
但是他们没有工作,我仍然没有超过 8 个线程。