我有 Spring boot - RabbitMQ 应用程序。使用的交换器是topicexchange。
public static final String RPC_REQ_QUEUE = "req.queue";
public static final String RPC_RES_QUEUE = "res.queue";
public static final String RPC_EXCHANGE = "rpc_exchange";
@Bean
public Queue reqQueue() { return new Queue(RPC_REQ_QUEUE); }
@Bean
public Queue resQueue() { return new Queue(RPC_RES_QUEUE); }
@Bean
public TopicExchange exchange() {return new TopicExchange(RPC_EXCHANGE); }
@Bean
public Binding requestBinding(TopicExchange exchange, Queue reqQueue) {
return BindingBuilder.bind(reqQueue).to(exchange)
.with(RPC_REQ_QUEUE);
}
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
return BindingBuilder.bind(resQueue).to(exchange)
.with(RPC_RES_QUEUE);
}
客户端和服务器 @个人资料(“客户”) @组件
public class MyConsumerRequestor {
.....
public void send() {
System.out.println(" [x] Requesting fib(" + start + ")");
Integer response = (Integer) template.convertSendAndReceive
(exchange.getName(), RPC_REQ_QUEUE,
String.valueOf(start++),
message -> {
message.getMessageProperties().setReplyTo(RPC_RES_QUEUE);
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
});
System.out.println(" [.] Got '" + response + "'");
}
服务器
@Profile("server")
@Component
public class MyProducerBackend {
@RabbitListener(queues = RPC_REQ_QUEUE, concurrency = "2")
public void fibonacci(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
int n = Integer.parseInt(body);
int result = fib(n);
System.out.println(" [.] Returning " + result);
// Send response to RPC_RES_QUEUE
rabbitTemplate.convertAndSend(exchange.getName(), RPC_RES_QUEUE, String.valueOf(result),
msg -> {
msg.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
return msg;
});
}
观察:
客户端正在发送消息,服务器正在接收并返回消息。但服务器上的相关 Id 类似于 1、2、3 ...而不是从客户端发送的相关 Id。
应用程序使用客户端和服务器配置文件启动。
RabbitMq 示例 Rabbit 使用直接交换,无法演示关联 ID 和并发/异步。
问题:
[x] 请求 fib(1)
[.] 得到“空”
为什么相关 ID 不匹配?我需要在客户端和服务器的配置中拆分 bean 吗?
注意:我使用的是固定回复队列。
编辑:
源代码:源代码git
您在问题中展示了这一点:
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
return BindingBuilder.bind(resQueue).to(exchange)
.with(RPC_RES_QUEUE);
}
但是您的代码中遗漏了这部分。 当我将其添加到您的
Config
时:
@Bean
public Binding responseBinding(TopicExchange rpcExchange, Queue replyQueue) {
return BindingBuilder.bind(replyQueue).to(rpcExchange).with(RPC_REPLY_QUEUE);
}
已经开始工作了。
与
correlationId
的差异是在RabbitTemplate
内部根据自己的逻辑进行了更改。
从技术上讲,你根本不用担心这个问题。了解
@RabbitListener
如何处理回复:https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven/reply.html
因此,这是您应该在服务器端进行的更改:
@RabbitListener(queues = RPC_REQUEST_QUEUE, concurrency = "2")
@SendTo(RPC_EXCHANGE + '/' + RPC_REPLY_QUEUE)
public int processRequest(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
int number = Integer.parseInt(body);
System.out.println(message);
System.out.println("[Server] Received request: " + number);
int result = fibonacci(number);
System.out.println("[Server] Computed result: " + result);
return result;
}
这是在客户端:
public int sendRpcRequest() {
Integer response = (Integer) rabbitTemplate.convertSendAndReceive(
RPC_EXCHANGE,
"rpc.request.key",
String.valueOf(number++));
if (response != null) {
System.out.println("[Client] Received response: " + response);
} else {
System.err.println("[Client] Response is null!");
}
return response == null ? -1 : response;
}
你看,没有任何相关性头痛,
@RabbitListener
尽可能简单。