我有一个消费者微服务,用于处理来自其他生产者微服务的消息。我使用 Spring Cloud Stream 和 RabbitMq Binder 管理事件驱动结构。
我想限制消费者微服务消耗的最大并发消息数。我的意思是,如果生产者发送 7 条消息,如果我将要消费的最大消息数限制为 5 条。我预计消费者微服务将消费前 5 条消息。如果任何消息处理完成,我预计会消耗第 6 条消息。我尝试使用 RabbitMq 消费者属性“max-concurrenty:5”来管理它,但它不起作用。
如果有 2 个消费者微服务正在运行且最大并发数为 5,我预计每个消费者微服务最多应消费 5 条消息。他们总共应该消耗 10 条消息。此外,消息应该以平衡的方式在微服务之间分发。
这是我在 application.yml 中的活页夹配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
autoconfigure:
exclude: org.springframework.boot.actuate.autoconfigure.metrics.jdbc.DataSourcePoolMetricsAutoConfiguration
application:
name: simulationService
cloud:
function:
definition: functionRouter
stream:
function:
routing:
enabled: true
bindings:
callBack-out-0:
destination: simulation-workflow
content-type: application/json
group: workflow
functionRouter-in-0:
destination: simulation
content-type: application/json
group: simulation
rabbit:
bindings:
functionRouter:
consumer:
max-concurrency: 5
concurrency = 5
表示每个实例将获得5个消费者;每个消费者都有一个 prefetch
属性(Spring AMQP 中默认为 250,spring-cloud-stream 中默认为 1),该属性会影响消息分发。
1 应在消费者之间提供均匀的分配,但要以性能为代价。
无法将消费者“限制”为仅 5 条消息,但
prefetch = 5
可能会满足您的需求。
预取
预取计数。
默认:1。