发布者确认使用Spring Cloud Stream

问题描述 投票:0回答:1

我有兴趣使用Spring Cloud Stream在我们项目中的某些生产者中使用发布者确认。我曾尝试做一个小的PoC,但是它不起作用。据我在文档中所看到的,对于Asyncrhonous Publisher Confirm,这是可能的,并且应该和接下来的更改一样容易:

在application.yml中添加ConfirmAckChannel并启用errorChannelEnabled属性。

spring.cloud.stream:
  binders:
    rabbitDefault:
      defaultCandidate: false
      type: rabbit
      environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
 ....
  bindings:    
    testOutput:
      destination: test
      binder: rabbitDefault
      content-type: application/json    
  rabbit.bindings:   
    testOutput.producer:
      confirmAckChannel: "testAck"
      errorChannelEnabled: true

然后是一个由端点触发的简单服务,我在其中将与errorChannel相关的标头插入到事件中。

@Service
@RequiredArgsConstructor
public class TestService {

    private final TestPublisher testPublisher;

    public void sendMessage() {

        testPublisher.send(addHeaders());
    }

    private Message<Event<TestEvent>> addHeaders() {
        return withPayload(new Event<>(TestEvent.builder().build()))
                .setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
                .build();
    }
}

然后是RabbitMQ的发布者

@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {
        return messagingChannels.test().send(message);
    }
}

其中将MessagingChannels实现为

public interface MessagingChannels {

    @Input("testAck")
    MessageChannel testAck();

    @Input("errorChannelTest")
    MessageChannel testError();


    @Output("testOutput")
    MessageChannel test();
}

[之后,我实现了2个监听器,一个监听器用于errorChannelTest输入,另一个监听器用于testAck。

@Slf4j
@Component
@RequiredArgsConstructor
class TestErrorListener {

    @StreamListener("errorChannelTest")
    void onCommandReceived(Event<Message> message) {

        log.info("Message error received: " + message);
    }
}
@Slf4j
@Component
@RequiredArgsConstructor
class TestAckListener {

    @StreamListener("testAck")
    void onCommandReceived(Event<Message> message) {

        log.info("Message ACK received: " + message);
    }
}

但是,在这两个侦听器中,我没有收到任何RabbitMQ的ACK或NACK,该事件已正确发送到RabbitMQ并由交易所管理,但后来我没有收到RabbitMQ的任何答复。

我想念什么吗?我也检查了这2个属性,但效果不佳

spring:
  rabbitmq:
    publisher-confirm-type: CORRELATED
    publisher-returns: true

我正在使用Spring-Cloud-Stream 3.0.1.RELEASE和spring-cloud-starter-stream-rabbit 3.0.1.RELEASE

rabbitmq spring-cloud-stream spring-rabbitmq
1个回答
0
投票

testAck不应该是绑定;它应该是@ServiceActivator

.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")

在这种情况下不起作用;错误被发送到名为testOutput.errors的通道;再次;这需要@ServiceActivator,而不是绑定。

© www.soinside.com 2019 - 2024. All rights reserved.