项目由三个主要部分组成:
(设备MQTT协议)生产者--> RabbitMQ <-- consumer (Spring app use AMQP protocol)
我使用 MQTTX Client Toolbox 来模拟生产者:
quick.white.rabbit - topic
ewogICAibWF0cml4IjoiZm9sbG93IHRoZSB3aGl0ZSByYWJiaXTigKYiLAogICAiZGF0ZSI6Ik1hcmNoIDMxLCAxOTk5Igp9 - Base64 encoding data
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange topic() {
return new TopicExchange("amq.topic"); // is that OK to use default name amq.topic ?
}
@Bean
public Queue autoDeleteQueue() {
return new AnonymousQueue();
}
@Bean
public Binding binding(TopicExchange topic,
Queue autoDeleteQueue) {
return BindingBuilder.bind(autoDeleteQueue)
.to(topic)
.with("*.*.rabbit");
}
}
@Service
public class RabbitMQConsumer {
static void dumb(Object payload, Map<String, Object> headers){
System.out.println("-----------------------------------");
headers.forEach((k,v) -> System.out.println(k + "=" + v));
System.out.println(payload);
}
@RabbitListener(queues = "#{autoDeleteQueue.name}")
public void consumePayload(@Payload String encodedMessage, @Headers Map<String, Object> headers){
String payload = new String(Base64.getDecoder().decode(encodedMessage));
dumb(payload, headers);
}
}
我在不同的端口 8081 和 8082 上运行同一应用程序使用者的两个实例
java -jar target/consumer_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8081
java -jar target/consumer_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8082
问题是消息同时到达两个消费者
java -jar target/deep_dive_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8081
-----------------------------------
amqp_receivedDeliveryMode=NON_PERSISTENT
amqp_receivedRoutingKey=quick.white.rabbit
amqp_receivedExchange=amq.topic
x-mqtt-publish-qos=0
x-mqtt-dup=false
amqp_deliveryTag=3
amqp_consumerQueue=spring.gen-X6e3uiz5S-K4_RA0tt89cg
amqp_redelivered=false
id=debc649f-9ae1-82a8-be5f-0a3f2c9e2f87
amqp_consumerTag=amq.ctag-brMleQwCDNmcCHyT_lbF8A
amqp_lastInBatch=false
timestamp=1708633236541
{
"matrix":"follow the white rabbit…",
"date":"March 31, 1999"
}
java -jar target/deep_dive_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8082
-----------------------------------
amqp_receivedDeliveryMode=NON_PERSISTENT
amqp_receivedRoutingKey=quick.white.rabbit
amqp_receivedExchange=amq.topic
x-mqtt-publish-qos=0
x-mqtt-dup=false
amqp_deliveryTag=3
amqp_consumerQueue=spring.gen-Ym1vcVbUQ4aZqnG3Bu4OyA
amqp_redelivered=false
id=dd1a5d91-4972-dcca-0669-dc40f9d85a15
amqp_consumerTag=amq.ctag-zqy6gfLSLaZSf3H3W00M1A
amqp_lastInBatch=false
timestamp=1708633236541
{
"matrix":"follow the white rabbit…",
"date":"March 31, 1999"
}
如何配置 RabbitMQ 使用循环消费者,每次传递消息时,队列中的下一个消费者将收到下一条消息?目前它同时适用于两个消费者。
MESSAGE 1 // port 8081
MESSAGE 2 // port 8082
MESSAGE 3 // port 8081
MESSAGE 4 // port 8082