Spring Integration:如何从队列中读取选定的消息

问题描述 投票:1回答:1

如果我有一个队列和多个订阅者,我如何编码订阅者只删除他们感兴趣的消息?我可以使用PublishSubscribeChannel将消息发送给所有订阅者,但它没有过滤功能,我不清楚消息是否在发送后被删除。另一种选择是读取所有消息,并在订阅者中进行过滤,但随后我需要为消息索引发明Kafka-ish行为,以防止已经看到的消息再次被处理。

java spring spring-integration
1个回答
1
投票

确实,Spring Integration中没有这样的persistent topic抽象开箱即用。但是,既然你说你需要一个内存解决方案,那么如何考虑启动嵌入式ActiveMQ并使用基于Jms.publishSubscribeChannel()目的地的Topic?是的,即使对于这种类型的selector,Spring Integration订阅者仍然没有MessageChannel,但你仍然可以使用.filter()来丢弃你不感兴趣的消息。

你可以通过Hazelcast ITopic达到:

    @Bean
    public ITopic<Message<?>> siTopic() {
        return hazelcastInstance().getTopic("siTopic");
    }

    @Bean
    public IntegrationFlow subscriber1() {
        return IntegrationFlows.from(
                Flux.create(messageFluxSink ->
                        siTopic()
                                .addMessageListener(message ->
                                        messageFluxSink.next(message.getMessageObject()))))
                .filter("headers.myHeader == foo")
                .get();
    }

    @Bean
    public IntegrationFlow subscriber2() {
        return IntegrationFlows.from(
                Flux.create(messageFluxSink ->
                        siTopic()
                                .addMessageListener(message ->
                                        messageFluxSink.next(message.getMessageObject()))))
                .filter("headers.myHeader == bar")
                .get();
    }

好吧,实际上看你的简单的内存模型,我甚至会说,简单的QueueChannelbridgePublishSubscribeChannel与每个订户中提到的过滤器应该足够你:

    @Bean
    public PollableChannel queueChannel() {
        return new QueueChannel();
    }

    @Bean
    @BridgeFrom("queueChannel")
    public MessageChannel publishSubscribeChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public IntegrationFlow subscriber1() {
        return IntegrationFlows.from(publishSubscribeChannel())
                .filter("headers.myHeader == foo")
                .get();
    }

    @Bean
    public IntegrationFlow subscriber2() {
        return IntegrationFlows.from(publishSubscribeChannel())
                .filter("headers.myHeader == bar")
                .get();
    }

UPDATE

还有一个选择使用而不是PublishSubscribeChannelfilter组合就像RecipientListRouterhttps://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#router-implementations-recipientlistrouter

© www.soinside.com 2019 - 2024. All rights reserved.