RabbitTemplate的setChannelTransacted标志导致消息未传递到队列中

问题描述 投票:0回答:1

鉴于我有带有AMQP匿名队列和扇出交换的应用程序:

@Bean
public Queue cacheUpdateAnonymousQueue() {
    return new AnonymousQueue();
}

public static final String CACHE_UPDATE_FANOUT_EXCHANGE = "cache.update.fanout";

@Bean
FanoutExchange cacheUpdateExchange() {
    return new FanoutExchange(CACHE_UPDATE_FANOUT_EXCHANGE);
}

@Bean
Binding cacheUpdateQueueToCacheUpdateExchange() {
    return bind(cacheUpdateAnonymousQueue())
            .to(cacheUpdateExchange());
}

和Spring Integration流程:

@Bean
public IntegrationFlow cacheOutputFlow() {
    return from(channelConfig.cacheUpdateOutputChannel())
            .transform(objectToJsonTransformer())
            .handle(outboundAdapter())
            .get();
}

而且我使用出站适配器:

public MessageHandler outboundAdapter() {
    rabbitTemplate.setChannelTransacted(true);
    return outboundAdapter(rabbitTemplate)
            .exchangeName(CACHE_UPDATE_FANOUT_EXCHANGE)
            .get();
}

我可以在日志中看到:

o.s.amqp.rabbit.core.RabbitTemplate: Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@40976c4b Shared Rabbit Connection: SimpleConnection@1cfaa28d [delegate=amqp://[email protected]:5672/, localPort= 56042]
o.s.amqp.rabbit.core.RabbitTemplate: Publishing message on exchange [cache.update.fanout], routingKey = []

但是消息未传递到绑定到cache.update.fanout交换的队列。

当我在出站适配器中设置rabbitTemplate.setChannelTransacted(false);时,我可以在日志中看到:

o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@11a1389d Shared Rabbit Connection: SimpleConnection@444c6abf [delegate=amqp://[email protected]:5672/, localPort= 56552]
o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message on exchange [mer.cache.update.fanout], routingKey = []

消息已传递到队列。

为什么没有在第一种情况下发送邮件?

为什么RabbitTemplate不能显示某些信息?

rabbitmq spring-integration spring-amqp rabbitmq-exchange spring-integration-amqp
1个回答
0
投票

您的日志具有不同的交换名称;我只是像这样测试它...

@SpringBootApplication
public class So60993877Application {

    public static void main(String[] args) {
        SpringApplication.run(So60993877Application.class, args);
    }

    @Bean
    public Queue cacheUpdateAnonymousQueue() {
        return new AnonymousQueue();
    }

    public static final String CACHE_UPDATE_FANOUT_EXCHANGE = "cache.update.fanout";

    @Bean
    FanoutExchange cacheUpdateExchange() {
        return new FanoutExchange(CACHE_UPDATE_FANOUT_EXCHANGE);
    }

    @Bean
    Binding cacheUpdateQueueToCacheUpdateExchange() {
        return BindingBuilder.bind(cacheUpdateAnonymousQueue())
                .to(cacheUpdateExchange());
    }

    @RabbitListener(queues = "#{cacheUpdateAnonymousQueue.name}")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            template.convertAndSend(cacheUpdateAnonymousQueue().getName(), "foo");
            template.setChannelTransacted(true);
            template.convertAndSend(cacheUpdateAnonymousQueue().getName(), "bar");
        };
    }

}

没有问题。

foo
bar
© www.soinside.com 2019 - 2024. All rights reserved.