消息驱动通道适配器 - 消息并发处理

问题描述 投票:0回答:1

问题

我们有一个具有以下消息驱动通道适配器配置的旧应用程序

<jms:message-driven-channel-adapter 
    id="processRequest"        
    destination="requestFromLoader"
    connection-factory="connectionFactory"
    max-concurrent-consumers="30"
    message-converter="xmlMarshalConverter"
    channel="jmsInChannel"
    error-channel="errorChannel"/>

使用此配置max-concurrent-consumers =“30”我们预计有30个并行消费者(processRequest-container-1到30)来消费消息,但在测试过程中我们发现每个消费者依次消费10条消息,Eg - processRequest-container-1 处理消息 1 到 10,然后创建 processRequest-container-2 并处理 11 到 20 等等..

如果只有 2 个请求,这会导致第二条消息等待,即使还有 29 个其他消费者。

分辨率

我们添加了 max-messages-per-task="1",这似乎产生了正确的结果,但我们仍在使用大量消息进行测试。

<jms:message-driven-channel-adapter 
    id="processRequest"        
    destination="requestFromLoader"
    connection-factory="connectionFactory"
    max-concurrent-consumers="30"
    max-messages-per-task="1"
    message-converter="xmlMarshalConverter"
    channel="jmsInChannel"
    error-channel="errorChannel"/>

问题

  1. 我们通知日志中的消费者名称(processRequest-container-1)将达到2000或3000(processRequest-container-2546),并且不限于processRequest-container-1到processRequest-container-30,但在第一种情况下,没有 max-messages-per-task="1" 它没有超出 processRequest-container-30,这正常吗?或者我们需要做些什么吗?

  2. 在spring文档中max-messages-per-task =“1”被定义为每条消息要执行的任务,什么是任务以及最大并行消费者如何受此影响?

  3. 添加 max-messages-per-task="1" 正确还是我们遗漏/忽略了其他内容?

任何建议评论都很好。预先感谢。

java spring concurrency spring-integration spring-jms
1个回答
0
投票

这个问题更多的是关于 Spring JMS,而不是 Spring Integration。

最好阅读用于上述

DefaultMessageListenerContainer
<jms:message-driven-channel-adapter>
的源代码和 Javadoc。

有一条有趣的评论可能可以解释您的行为:

/**
 * Specify the maximum number of concurrent consumers to create. Default is 1.
 * <p>If this setting is higher than "concurrentConsumers", the listener container
 * will dynamically schedule surplus consumers at runtime, provided that enough
 * incoming messages are encountered. Once the load goes down again, the number of
 * consumers will be reduced to the standard level ("concurrentConsumers") again.
 * <p>Raising the number of concurrent consumers is recommendable in order
 * to scale the consumption of messages coming in from a queue. However,
 * note that any ordering guarantees are lost once multiple consumers are
 * registered. In general, stick with 1 consumer for low-volume queues.
 * <p><b>Do not raise the number of concurrent consumers for a topic,
 * unless vendor-specific setup measures clearly allow for it.</b>
 * With regular setup, this would lead to concurrent consumption
 * of the same message, which is hardly ever desirable.
 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
 * @see #setConcurrentConsumers
 */
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {

因此,由于您不使用

concurrent-consumers="30"
来代替,因此您获得最多 30 个动态实例,并且它们的 id 确实增加了,这是正常的。

如果不指定

max-messages-per-task="1"
,逻辑是这样的:

        if (this.taskExecutor instanceof SchedulingTaskExecutor ste && ste.prefersShortLivedTasks()) {
            if (this.maxMessagesPerTask == Integer.MIN_VALUE) {
                // TaskExecutor indicated a preference for short-lived tasks. According to
                // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
                // unless the user specified a custom value.
                this.maxMessagesPerTask = 10;
            }
        }

或者你的

taskExecutor
不是那个
SchedulingTaskExecutor
(这确实是默认情况下),因此你的所有消息都由同一个消费者实例处理。

一些文档也在这里:https://docs.spring.io/spring-framework/reference/integration/jms/using.html#jms-mdp

© www.soinside.com 2019 - 2024. All rights reserved.