我需要轮询消息并将其发布到 RabbitMQ。设置了100个store,每个store可以轮询5条以上消息并将其发布到RabbitMQ。每分钟进行一次轮询。下面是轮询并发布到 RabbitMQ 的代码。
public class FetchMessages{
@Scheduled(fixedRateString = "60000")
private void sendToClxRmq() {
//Code to fetch the messages;
//loop below line for all the 100 stores and messages polled by each of the store
publishMessage.sendMessages(publishMessageDto); }
}
public class PublishMessage {
PublishMessageDto publishMessageDto;
@Bean
public Supplier<Message<PublishMessageDto>> routeMessage() {
return () -> {
if (ObjectUtils.isNotEmpty(publishMessageDto)) {
return MessageBuilder.withPayload(publishMessageDto)
.setHeader("store", publishMessageDto.getStore())
.build();
} else {
return null;
}
};
}
public void sendMessages(final PublishMessageDto publishMessageDto) {
this.publishMessageDto = publishMessageDto;
}
}
在提供的代码片段中,Supplier bean 每当调用其 get() 方法时都会生成一条消息。该框架提供了默认的轮询机制,该机制将触发供应商的调用,并且默认情况下每秒都会执行一次。换句话说,上述配置每秒生成一条消息,并且每条消息都发送到绑定器公开的输出目的地。
假设 sendToClxRmq() 方法可能每秒为 1 个存储生成超过 5 条消息。因此,通过sendToClxRmq()调用sendMessages()方法将覆盖PublishMessage类中publishMessageDto的值。因此,每当调用Supplier bean的get()方法时(每秒),它总是会发送最新的消息。
如何使用供应商模式解决此问题。我应该使用 StreamBridge 还是有更好的解决方案。
StreamBridge
是使用该 @Scheduled
模式实现您的需求的最佳方式。
还有一种将
Supplier<Flux<Message<PublishMessageDto>>>
与 Sinks.Many
结合使用的方法,但这对于您的用例来说真的是必要的开销吗?
嗯,另一种解决方案是使用内部
BlockingQueue
而不是简单的(且有缺陷的) PublishMessageDto publishMessageDto
属性。这样,您的 sendToClxRmq
就会向该队列提供数据,Spring Sloud Stream 将通过 Supplier
轮询以自己的速度消耗该队列。然而,当您生成的数据超过轮询器可以消耗的数据并且您的应用程序意外停止时,此解决方案可能会导致数据丢失。