我有兴趣使用Spring Cloud Stream在我们项目中的某些生产者中使用发布者确认。我曾尝试做一个小的PoC,但是它不起作用。据我在文档中所看到的,对于Asyncrhonous Publisher Confirm,这是可能的,并且应该和接下来的更改一样容易:
在application.yml中添加ConfirmAckChannel并启用errorChannelEnabled属性。
spring.cloud.stream:
binders:
rabbitDefault:
defaultCandidate: false
type: rabbit
environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
....
bindings:
testOutput:
destination: test
binder: rabbitDefault
content-type: application/json
rabbit.bindings:
testOutput.producer:
confirmAckChannel: "testAck"
errorChannelEnabled: true
然后是一个由端点触发的简单服务,我在其中将与errorChannel相关的标头插入到事件中。
@Service
@RequiredArgsConstructor
public class TestService {
private final TestPublisher testPublisher;
public void sendMessage() {
testPublisher.send(addHeaders());
}
private Message<Event<TestEvent>> addHeaders() {
return withPayload(new Event<>(TestEvent.builder().build()))
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
.build();
}
}
然后是RabbitMQ的发布者
@Component
@RequiredArgsConstructor
public class TestPublisher {
private final MessagingChannels messagingChannels;
public boolean send(Message<Event<TestEvent>> message) {
return messagingChannels.test().send(message);
}
}
其中将MessagingChannels实现为
public interface MessagingChannels {
@Input("testAck")
MessageChannel testAck();
@Input("errorChannelTest")
MessageChannel testError();
@Output("testOutput")
MessageChannel test();
}
[之后,我实现了2个监听器,一个监听器用于errorChannelTest输入,另一个监听器用于testAck。
@Slf4j
@Component
@RequiredArgsConstructor
class TestErrorListener {
@StreamListener("errorChannelTest")
void onCommandReceived(Event<Message> message) {
log.info("Message error received: " + message);
}
}
@Slf4j
@Component
@RequiredArgsConstructor
class TestAckListener {
@StreamListener("testAck")
void onCommandReceived(Event<Message> message) {
log.info("Message ACK received: " + message);
}
}
但是,在这两个侦听器中,我没有收到任何RabbitMQ的ACK或NACK,该事件已正确发送到RabbitMQ并由交易所管理,但后来我没有收到RabbitMQ的任何答复。
我想念什么吗?我也检查了这2个属性,但效果不佳
spring:
rabbitmq:
publisher-confirm-type: CORRELATED
publisher-returns: true
我正在使用Spring-Cloud-Stream 3.0.1.RELEASE和spring-cloud-starter-stream-rabbit 3.0.1.RELEASE
testAck
不应该是绑定;它应该是@ServiceActivator
。
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
在这种情况下不起作用;错误被发送到名为testOutput.errors
的通道;再次;这需要@ServiceActivator
,而不是绑定。