Spring RabbitMQ - RPC - CorrelationId 不匹配 - TopicExchange - 客户端 - 服务器 - 模型

问题描述 投票:0回答:1

我有 Spring boot - RabbitMQ 应用程序。使用的交换器是topicexchange。

public static final String RPC_REQ_QUEUE = "req.queue";
public static final String RPC_RES_QUEUE = "res.queue";
public static final String RPC_EXCHANGE = "rpc_exchange";


@Bean
public Queue reqQueue() { return new Queue(RPC_REQ_QUEUE);  }
@Bean
public Queue resQueue() { return new Queue(RPC_RES_QUEUE);  }

@Bean
public TopicExchange exchange() {return new TopicExchange(RPC_EXCHANGE); }

@Bean
public Binding requestBinding(TopicExchange exchange,  Queue reqQueue) {
    return BindingBuilder.bind(reqQueue).to(exchange)
            .with(RPC_REQ_QUEUE);
}
@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
    return BindingBuilder.bind(resQueue).to(exchange)
            .with(RPC_RES_QUEUE);
}

客户端和服务器 @个人资料(“客户”) @组件

public class MyConsumerRequestor {
.....

public void send() {
        System.out.println(" [x] Requesting fib(" + start + ")");
        Integer response = (Integer) template.convertSendAndReceive
                (exchange.getName(), RPC_REQ_QUEUE,
                        String.valueOf(start++),
                        message -> {
                            message.getMessageProperties().setReplyTo(RPC_RES_QUEUE);
                            message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                            return message;
                        });

        System.out.println(" [.] Got '" + response + "'");
    }

服务器

@Profile("server")
@Component
public class MyProducerBackend {
 @RabbitListener(queues = RPC_REQ_QUEUE, concurrency = "2")

    public void fibonacci(Message message) {

        String body = new String(message.getBody(), StandardCharsets.UTF_8);
       
        int n = Integer.parseInt(body);
        
        int result = fib(n);
        System.out.println(" [.] Returning " + result);
        // Send response to RPC_RES_QUEUE
        rabbitTemplate.convertAndSend(exchange.getName(), RPC_RES_QUEUE, String.valueOf(result),
                msg -> {
                        msg.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
                        return msg;
                    });
    } 

观察:

客户端正在发送消息,服务器正在接收并返回消息。但服务器上的相关 Id 类似于 1、2、3 ...而不是从客户端发送的相关 Id。

应用程序使用客户端和服务器配置文件启动。

RabbitMq 示例 Rabbit 使用直接交换,无法演示关联 ID 和并发/异步。

问题:

[x] 请求 fib(1)

[.] 得到“空”

为什么相关 ID 不匹配?我需要在客户端和服务器的配置中拆分 bean 吗?

注意:我使用的是固定回复队列。

编辑:

源代码:源代码git

spring spring-boot rabbitmq amqp spring-amqp
1个回答
0
投票

您在问题中展示了这一点:

@Bean
public Binding responseBinding(TopicExchange exchange, Queue resQueue) {
    return BindingBuilder.bind(resQueue).to(exchange)
            .with(RPC_RES_QUEUE);
}

但是您的代码中遗漏了这部分。 当我将其添加到您的

Config
时:

@Bean
public Binding responseBinding(TopicExchange rpcExchange, Queue replyQueue) {
    return BindingBuilder.bind(replyQueue).to(rpcExchange).with(RPC_REPLY_QUEUE);
}

已经开始工作了。

correlationId
的差异是在
RabbitTemplate
内部根据自己的逻辑进行了更改。

从技术上讲,你根本不用担心这个问题。了解

@RabbitListener
如何处理回复:https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven/reply.html

因此,这是您应该在服务器端进行的更改:

@RabbitListener(queues = RPC_REQUEST_QUEUE, concurrency = "2")
@SendTo(RPC_EXCHANGE + '/' + RPC_REPLY_QUEUE)
public int processRequest(Message message) {
    String body = new String(message.getBody(), StandardCharsets.UTF_8);
    int number = Integer.parseInt(body);

    System.out.println(message);

    System.out.println("[Server] Received request: " + number);

    int result = fibonacci(number);
    System.out.println("[Server] Computed result: " + result);

    return result;
}

这是在客户端:

public int sendRpcRequest() {
    Integer response = (Integer) rabbitTemplate.convertSendAndReceive(
            RPC_EXCHANGE,
            "rpc.request.key",
            String.valueOf(number++));

    if (response != null) {
        System.out.println("[Client] Received response: " + response);
    } else {
        System.err.println("[Client] Response is null!");
    }
    return response == null ? -1 : response;
}

你看,没有任何相关性头痛,

@RabbitListener
尽可能简单。

© www.soinside.com 2019 - 2024. All rights reserved.