我有一个Spring Integration项目,在这个项目中,我从RabbitMQ队列中发送和接收消息。
系统发布消息的顺序是确定的,但之后接收消息的顺序却不正确。
于是我发现这段(https:/docs.spring.iospring-integrationreferencehtmlamqp.html#amqp-strict-ordering。),并将监听器配置为: simpleMessageListenerContainer.setPrefetchCount(1);
.
我们进行了一些测试,它运行良好。然而,一周左右后,它开始出现类似的排序问题。
让我再解释一下。
我有两个流 (IntegrationFlow
s)在一个spring集成应用程序中。
在第一个 IntegrationFlow
它创建消息并将每条消息发布到兔子队列中。
就在发布之前,它记录了每一条消息,我可以确认,在发布之前,它的 sequenceNumber
按预期增量(在我的例子中是1,2,3,4,5,6,7,8,9,10,11)。
然后在第二个流中消耗这些发布的消息。就在每个消息被接收后,流程再次记录它。在这里我发现 sequenceNumber
没有按照预期的顺序递增(在我的例子中是1,3,5,7,2,4,6,8,9,10,11)。
对于这个应用程序来说,以正确的顺序处理消息是非常重要的。
当我研究rabbit的用户界面时,我发现了以下几点(大部分都是我所期望的)。
我没有想到在我的应用程序连接中会有3个通道,我自己没有配置,可能是Spring Integration AMQP帮我配置的。我自己没有配置,可能是Spring Integration AMQP帮我配置的。
现在,我认为另一个通道可能会变得活跃,这导致了排序问题。但我在日志中找不到这个问题。在配置中也没有。
一段代码。
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(final ConnectionFactory connectionFactory,
final Jackson2JsonMessageConverter jackson2MessageConverter,
final MethodInterceptor retryInterceptor) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setMessageConverter(jackson2MessageConverter);
simpleMessageListenerContainer.setAdviceChain(retryInterceptor);
// force FIFO ordering (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering):
simpleMessageListenerContainer.setPrefetchCount(1);
simpleMessageListenerContainer.setConcurrency();
return simpleMessageListenerContainer;
}
@Bean
public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
final Queue q1, final Queue q2, final Queue q3,
final Queue q4, final Queue q5,
final Queue q6) {
simpleMessageListenerContainer.setQueues(q1, q2, q3, q4, q5, q6);
return IntegrationFlows.from(
Amqp.inboundAdapter(simpleMessageListenerContainer)
.messageConverter(jackson2MessageConverter))
.log(LoggingHandler.Level.DEBUG, "com.my.thing")
.headerFilter(MyMessageHeaders.QUEUE_ROUTING_KEY)
.route(router())
.get();
}
private HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
router.setChannelMapping(AmqpConfiguration.Q1_NAME, Q1_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q2_NAME, Q2_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q3_NAME, Q3_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q4_NAME, Q4_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q5_NAME, Q5_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q6_NAME, Q6_CHANNEL);
router.setResolutionRequired(false);
router.setDefaultOutputChannelName("errorChannel");
return router;
}
发布:
@Bean
public IntegrationFlow prepareForUpload(final Handler1 handler1) {
BinaryFileSplitter binaryFileSplitter = new BinaryFileSplitter(true);
binaryFileSplitter.setChunkSize(chunksize);
return IntegrationFlows
.from(aFlow)
.handle(handler1)
.split(binaryFileSplitter)
.log(LoggingHandler.Level.TRACE, "com.my.log.identifyer")
// Send message to the correct AMQP queue after successful processing
.enrichHeaders(h -> h.header(QUEUE_ROUTING_KEY, AmqpConfiguration.Q4_NAME))
.channel(MyChannels.AMQP_OUTPUT)
.get();
}
@Bean
public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate, final UpdateDb updateDb) {
return IntegrationFlows.from(MyChannels.AMQP_OUTPUT)
.log(LoggingHandler.Level.DEBUG, "com.my.log.identify")
.handle(updateDb)
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName(AmqpConfiguration.THE_TOPIC_EXCHANGE)
.routingKeyExpression("headers['queueRoutingKey']"))
.get();
}
接收:
@Bean
public IntegrationFlow handleReceivedMessages() {
return IntegrationFlows
.from(Q4_CHANNEL)
.log(LoggingHandler.Level.DEBUG, "com.my.log.identifyer")
.handle(..)
.aggregate(a -> a.releaseStrategy(new ChunkReleaseStrategy()))
.transform(..)
....(..)..
...
正如您所指向的文档中所讨论的那样,您需要在您的项目中添加一个 "发布 "和 "接收 "选项。BoundRabbitChannelAdvice
到分流器,使所有的下游流量使用同一通道。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gate.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template)))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}