我们将大量的Spring Batch分区作业从2.2迁移到4.x(XML Configuration Of Partition Jobs)。当我调试分区作业时,我得到了这个堆栈跟踪。
2020-05-13 17:03:38,311 ERROR [org.springframework.batch.core.step.AbstractStep] (SimpleAsyncTaskExecutor-1) Encountered an error executing step step.partitionStep in job job.partitioned: java.lang.ClassCastException: class org.springframework.batch.core.StepExecution cannot be cast to class java.util.Collection (org.springframework.batch.core.StepExecution)
at deployment.org.springframework.batch.integration.partition.MessageChannelPartitionHandler.receiveReplies(MessageChannelPartitionHandler.java:293)
at deployment.org.springframework.batch.integration.partition.MessageChannelPartitionHandler.handle(MessageChannelPartitionHandler.java:232)
at deployment.org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:106)
at deployment.org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203)
at deployment.org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
启用日志后,我看到前4条消息进入聚合器 最后一条进入MessageChannelPartitionHandler。
2020-05-13 15:04:40,066 DEBUG [AggregatingMessageHandler] (task-scheduler-3) org.springframework.integration.config.AggregatorFactoryBean#38 received message: GenericMessage [payload=StepExecution: id=850, version=3, name=step:4, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=4, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403876387, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=ad86a496-1cb0-08d7-801f-e7c582a51ee5, jms_messageId=ID:5a125750-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403876412}]
2020-05-13 15:04:46,072 DEBUG [AggregatingMessageHandler] (task-scheduler-10) org.springframework.integration.config.AggregatorFactoryBean#38 received message: GenericMessage [payload=StepExecution: id=847, version=3, name=step:3, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=0, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403885895, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=a066dcca-27b4-ad87-b639-a7bf15125f1c, jms_messageId=ID:5fbd2591-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403885907}]
2020-05-13 15:04:52,073 DEBUG [AggregatingMessageHandler] (task-scheduler-2) org.springframework.integration.config.AggregatorFactoryBean#38 received message: GenericMessage [payload=StepExecution: id=846, version=3, name=step:2, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=1, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403888831, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=a25ce9fa-9f7c-f12e-1495-a7e91e12f43a, jms_messageId=ID:617d2512-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403888860}]
2020-05-13 15:04:58,074 DEBUG [AggregatingMessageHandler] (task-scheduler-5) org.springframework.integration.config.AggregatorFactoryBean#38 received message: GenericMessage [payload=StepExecution: id=849, version=3, name=step:1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=2, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403893969, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=98f5c7f8-0133-4f74-3666-0b1d36055341, jms_messageId=ID:648d2433-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403893980}]
2020-05-13 15:05:19,271 DEBUG [org.springframework.batch.integration.partition.MessageChannelPartitionHandler] (SimpleAsyncTaskExecutor-1) Received replies: GenericMessage [payload=StepExecution: id=848, version=3, name=step:0, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=, headers={sequenceNumber=3, sequenceSize=5, jms_destination=ActiveMQQueue[jms.queue.partitionReplyQueue], priority=4, jms_timestamp=1589403876047, replyChannel=policy.step.applycredits.partitioned.jms.reply, jms_redelivered=false, JMSXDeliveryCount=1, jms_replyTo=ActiveMQQueue[jms.queue.partitionReplyQueue], correlationId=171:step, id=093e2d94-1911-1b4b-6334-d7d3032438ad, jms_messageId=ID:59de760f-955d-11ea-975d-98fa9b1fb9db, timestamp=1589403876109}]
我的配置如下。
<!-- Partition Handler -->
<bean id="partitioned.jms.handler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="jms.requests"/>
<property name="receiveTimeout" value="${partitioned.timeout}"/>
</bean>
</property>
<property name="stepName" value="partitioned.step.name"/>
<property name="gridSize" value="${step.partitioned.gridSize}"/>
<property name="replyChannel" ref="partitioned.jms.reply"/>
</bean>
<int:aggregator
input-channel="partitioned.jms.reply"
ref="partitioned.jms.handler"
/>
我的配置中是否有遗漏的地方?我以为聚合器会得到所有5条消息的响应,并将其发送到PartitionHandler。
谢谢你
我不得不添加聚合器release-strategy。
<bean id="partitionReleaseStrategy"
class="org.springframework.integration.aggregator.SimpleSequenceSizeReleaseStrategy"
/>
<int:aggregator
input-channel="partitioned.jms.reply"
ref="partitioned.jms.handler"
release-strategy="partitionReleaseStrategy"
/>