当我尝试使用频道消息存储时出现以下错误:-org.springframework.messaging.MessageDeliveryException:无法在超时内将消息发送到通道“ executionFilterChannel”:-1
<int:channel id="executionFilterOutputChannel" />
<int:channel id="testChannel" />
<int:channel id="executionFilterChannel">
<int:queue message-store="channelMessageStore" />
<int:interceptors>
<int:ref bean="kafkaIngestionInterceptor" />
</int:interceptors>
</int:channel>
<bean id="kafkaIngestionInterceptor" class="com.xyz.report.interceptor.KafkaIngestionInterceptor" />
<bean id="channelMessageStore"
class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource" />
<property name="channelMessageStoreQueryProvider" ref="queryProvider" />
<property name="tablePrefix" value="QUEUE_" />
<int:filter id="executionFilters"
input-channel="testChannel" ref="executionFilter"
method="productFilter" output-channel="executionFilterOutputChannel"
discard-channel="exceptionFilterChannel" />
<int:bridge id="bridgeChannel" input-channel="executionFilterChannel"
output-channel="testChannel">
<int:poller fixed-delay="10" max-messages-per-poll="1" >
<int:transactional transaction-manager="txManager" />
</int:poller>
</int:bridge>
<bean id="txManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<int:service-activator id="outputKafkaActivator"
input-channel="outputFromKafkaIngestion"
output-channel="executionFilterChannel" method="getKafkaMessage">
<bean class="com.xyz.report.service.KafkaListener" />
</int:service-activator>
[KafkaChannelInterceptor是通道“ executionFilterChannel”的通道拦截器。它具有以下方法:-
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("Before Sending =======================> " + channel + "message : "+ message);
return null;
}
完整日志:-
Message received from Kafka ================================> GenericMessage [payload=EquityExecutionDetail [id=11, product=Equity, quantity=1000, isin=US2547895122, executedPrice=100.5, acctNo=5478962478, orderType=Market, expiryType=Day, buyOrSell=Buy, tradeDt=, execStatus=Fully Executed], headers={kafka_offset=73, PRODUCT=Equity, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=datapipeline.kafka.ingestion.test.1, kafka_receivedTimestamp=1587920627054, EVENT=CREATE, REGION=APAC}]
Before Sending =======================> executionFilterChannelmessage : GenericMessage [payload=EquityExecutionDetail [id=11, product=Equity, quantity=1000, isin=US2547895122, executedPrice=100.5, acctNo=5478962478, orderType=Market, expiryType=Day, buyOrSell=Buy, tradeDt=, execStatus=Fully Executed], headers={kafka_offset=73, PRODUCT=Equity, kafka_timestampType=CREATE_TIME, id=dad27393-5ef6-9280-8d3e-60c458312d65, kafka_receivedPartitionId=0, kafka_receivedTopic=datapipeline.kafka.ingestion.test.1, kafka_receivedTimestamp=1587920627054, EVENT=CREATE, REGION=APAC, timestamp=1587920627094}]
2020-04-26 22:33:47.096 ERROR 22640 --- [aContainer1-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = datapipeline.kafka.ingestion.test.1, partition = 0, offset = 73, CreateTime = 1587920627054, serialized key size = -1, serialized value size = 335, headers = RecordHeaders(headers = [RecordHeader(key = PRODUCT, value = [34, 69, 113, 117, 105, 116, 121, 34]), RecordHeader(key = EVENT, value = [34, 67, 82, 69, 65, 84, 69, 34]), RecordHeader(key = REGION, value = [34, 65, 80, 65, 67, 34]), RecordHeader(key = spring_json_header_types, value = [123, 34, 80, 82, 79, 68, 85, 67, 84, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 69, 86, 69, 78, 84, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 82, 69, 71, 73, 79, 78, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = EquityExecutionDetail [id=11, product=Equity, quantity=1000, isin=US2547895122, executedPrice=100.5, acctNo=5478962478, orderType=Market, expiryType=Day, buyOrSell=Buy, tradeDt=, execStatus=Fully Executed])
org.springframework.messaging.MessageDeliveryException: Failed to send message to channel 'executionFilterChannel' within timeout: -1
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:118) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$200(KafkaMessageDrivenChannelAdapter.java:63) ~[spring-integration-kafka-2.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372) ~[spring-integration-kafka-2.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352) ~[spring-integration-kafka-2.3.0.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:925) [spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:909) [spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:860) [spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.3.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:622) [spring-kafka-1.3.2.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_211]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_211]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_211]
请帮助我,因为我被困了三天。
您在拦截器中的代码如下:
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("Before Sending =======================> " + channel + "message : "+ message);
return null;
}
您返回null
。因此,实际上没有任何内容发送到该频道。这就是为什么您会收到该异常的原因。
请参阅其JavaDocs:
/**
* Invoked before the Message is actually sent to the channel.
* This allows for modification of the Message if necessary.
* If this method returns {@code null} then the actual
* send invocation will not occur.
*/
@Nullable
default Message<?> preSend(Message<?> message, MessageChannel channel) {
为方便起见,在此问题上有一条调试消息:
message = interceptor.preSend(message, channel);
if (message == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(interceptor.getClass().getSimpleName()
+ " returned null from preSend, i.e. precluding the send.");
}
afterSendCompletion(previous, channel, false, null, interceptorStack);
return null;
}