如果我有一个队列和多个订阅者,我如何编码订阅者只删除他们感兴趣的消息?我可以使用PublishSubscribeChannel将消息发送给所有订阅者,但它没有过滤功能,我不清楚消息是否在发送后被删除。另一种选择是读取所有消息,并在订阅者中进行过滤,但随后我需要为消息索引发明Kafka-ish行为,以防止已经看到的消息再次被处理。
确实,Spring Integration中没有这样的persistent topic
抽象开箱即用。但是,既然你说你需要一个内存解决方案,那么如何考虑启动嵌入式ActiveMQ并使用基于Jms.publishSubscribeChannel()
目的地的Topic
?是的,即使对于这种类型的selector
,Spring Integration订阅者仍然没有MessageChannel
,但你仍然可以使用.filter()
来丢弃你不感兴趣的消息。
你可以通过Hazelcast ITopic
达到:
@Bean
public ITopic<Message<?>> siTopic() {
return hazelcastInstance().getTopic("siTopic");
}
@Bean
public IntegrationFlow subscriber1() {
return IntegrationFlows.from(
Flux.create(messageFluxSink ->
siTopic()
.addMessageListener(message ->
messageFluxSink.next(message.getMessageObject()))))
.filter("headers.myHeader == foo")
.get();
}
@Bean
public IntegrationFlow subscriber2() {
return IntegrationFlows.from(
Flux.create(messageFluxSink ->
siTopic()
.addMessageListener(message ->
messageFluxSink.next(message.getMessageObject()))))
.filter("headers.myHeader == bar")
.get();
}
好吧,实际上看你的简单的内存模型,我甚至会说,简单的QueueChannel
和bridge
到PublishSubscribeChannel
与每个订户中提到的过滤器应该足够你:
@Bean
public PollableChannel queueChannel() {
return new QueueChannel();
}
@Bean
@BridgeFrom("queueChannel")
public MessageChannel publishSubscribeChannel() {
return new PublishSubscribeChannel();
}
@Bean
public IntegrationFlow subscriber1() {
return IntegrationFlows.from(publishSubscribeChannel())
.filter("headers.myHeader == foo")
.get();
}
@Bean
public IntegrationFlow subscriber2() {
return IntegrationFlows.from(publishSubscribeChannel())
.filter("headers.myHeader == bar")
.get();
}
UPDATE
还有一个选择使用而不是PublishSubscribeChannel
和filter
组合就像RecipientListRouter
:https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#router-implementations-recipientlistrouter