如何为Spring Integration中的Rabbit MQ消息监听器执行严格的排序?

问题描述 投票:0回答:1

我有一个Spring Integration项目,在这个项目中,我从RabbitMQ队列中发送和接收消息。

系统发布消息的顺序是确定的,但之后接收消息的顺序却不正确。

于是我发现这段(https:/docs.spring.iospring-integrationreferencehtmlamqp.html#amqp-strict-ordering。),并将监听器配置为: simpleMessageListenerContainer.setPrefetchCount(1);.

我们进行了一些测试,它运行良好。然而,一周左右后,它开始出现类似的排序问题。

让我再解释一下。

我有两个流 (IntegrationFlows)在一个spring集成应用程序中。

在第一个 IntegrationFlow 它创建消息并将每条消息发布到兔子队列中。

就在发布之前,它记录了每一条消息,我可以确认,在发布之前,它的 sequenceNumber 按预期增量(在我的例子中是1,2,3,4,5,6,7,8,9,10,11)。

然后在第二个流中消耗这些发布的消息。就在每个消息被接收后,流程再次记录它。在这里我发现 sequenceNumber 没有按照预期的顺序递增(在我的例子中是1,3,5,7,2,4,6,8,9,10,11)。

对于这个应用程序来说,以正确的顺序处理消息是非常重要的。

当我研究rabbit的用户界面时,我发现了以下几点(大部分都是我所期望的)。

  • rabbit有3个连接(用于3个java应用)
  • 我的应用程序的连接有3个通道。其中2个通道是空闲的,没有消费者,1个通道有6个用户,预取数为1。
  • 每个用户的预取数为1
  • 我只关注其中的一个订阅者(一个队列)。
  • 这个队列的属性是'ack required'而不是'exclusive'。

我没有想到在我的应用程序连接中会有3个通道,我自己没有配置,可能是Spring Integration AMQP帮我配置的。我自己没有配置,可能是Spring Integration AMQP帮我配置的。

现在,我认为另一个通道可能会变得活跃,这导致了排序问题。但我在日志中找不到这个问题。在配置中也没有。

一段代码。

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(final ConnectionFactory connectionFactory,
                                                                         final Jackson2JsonMessageConverter jackson2MessageConverter,
                                                                         final MethodInterceptor retryInterceptor) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.setMessageConverter(jackson2MessageConverter);
        simpleMessageListenerContainer.setAdviceChain(retryInterceptor);
        // force FIFO ordering (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering):
        simpleMessageListenerContainer.setPrefetchCount(1);
        simpleMessageListenerContainer.setConcurrency();
        return simpleMessageListenerContainer;
    }

    @Bean
    public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
                                                         final Queue q1, final Queue q2, final Queue q3,
                                                         final Queue q4, final Queue q5,
                                                         final Queue q6) {
        simpleMessageListenerContainer.setQueues(q1, q2, q3, q4, q5, q6);
        return IntegrationFlows.from(
                Amqp.inboundAdapter(simpleMessageListenerContainer)
                        .messageConverter(jackson2MessageConverter))
                .log(LoggingHandler.Level.DEBUG, "com.my.thing")
                .headerFilter(MyMessageHeaders.QUEUE_ROUTING_KEY)
                .route(router())
                .get();
    }

    private HeaderValueRouter router() {
        HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
        router.setChannelMapping(AmqpConfiguration.Q1_NAME, Q1_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q2_NAME, Q2_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q3_NAME, Q3_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q4_NAME, Q4_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q5_NAME, Q5_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q6_NAME, Q6_CHANNEL);
        router.setResolutionRequired(false);
        router.setDefaultOutputChannelName("errorChannel");
        return router;
    }

发布:

    @Bean
    public IntegrationFlow prepareForUpload(final Handler1 handler1) {
        BinaryFileSplitter binaryFileSplitter = new BinaryFileSplitter(true);
        binaryFileSplitter.setChunkSize(chunksize);

        return IntegrationFlows
                .from(aFlow)
                .handle(handler1)
                .split(binaryFileSplitter)
                .log(LoggingHandler.Level.TRACE, "com.my.log.identifyer")
                // Send message to the correct AMQP queue after successful processing
                .enrichHeaders(h -> h.header(QUEUE_ROUTING_KEY, AmqpConfiguration.Q4_NAME))
                .channel(MyChannels.AMQP_OUTPUT)
                .get();
    }
    @Bean
    public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate, final UpdateDb updateDb) {
        return IntegrationFlows.from(MyChannels.AMQP_OUTPUT)
                .log(LoggingHandler.Level.DEBUG, "com.my.log.identify")
                .handle(updateDb)
                .handle(Amqp.outboundAdapter(amqpTemplate)
                        .exchangeName(AmqpConfiguration.THE_TOPIC_EXCHANGE)
                        .routingKeyExpression("headers['queueRoutingKey']"))
                .get();
    }

接收:

    @Bean
    public IntegrationFlow handleReceivedMessages() {
        return IntegrationFlows
                .from(Q4_CHANNEL)
                .log(LoggingHandler.Level.DEBUG, "com.my.log.identifyer")
                .handle(..)
                .aggregate(a -> a.releaseStrategy(new ChunkReleaseStrategy()))
                .transform(..)
                ....(..)..
                ...

java spring rabbitmq spring-integration spring-amqp
1个回答
0
投票

正如您所指向的文档中所讨论的那样,您需要在您的项目中添加一个 "发布 "和 "接收 "选项。BoundRabbitChannelAdvice 到分流器,使所有的下游流量使用同一通道。

        @Bean
        public IntegrationFlow flow(RabbitTemplate template) {
            return IntegrationFlows.from(Gate.class)
                    .split(s -> s.delimiters(",")
                            .advice(new BoundRabbitChannelAdvice(template)))
                    .<String, String>transform(String::toUpperCase)
                    .handle(Amqp.outboundAdapter(template).routingKey("rk"))
                    .get();
        }
© www.soinside.com 2019 - 2024. All rights reserved.