我将 spring-integration-java-dsl 1.2.3 升级到 spring-integration-core 6.3.4 之后我面临数据类型问题,每当我尝试到达消息有效负载时,这只是转换一个byte[] 而不是 Order 对象,自然地,我得到了强制转换异常。我怀疑是否缺少配置参数等,因为我可以使用旧版本的依赖项直接获取 Order 对象
这是简化的演示项目:repo
application.yml 文件中的云配置
cloud:
stream:
default:
content-type: application/json
defaultBinder: rabbit
function:
definition: processOrderRequestListener;
bindings:
processOrderRequestListener-in-0:
destination: orderQueue
group: OrderRequest
content-type: application/json
consumer:
concurrency: 10
max-attempts: 1
processOrderRequestListener-out-0:
destination: processOrderRequestChannel
content-type: application/json
出版
@Bean
public Function<Message<Order>, Message<Order>> processOrderRequestListener() {
return message -> {
Message<Order> order = MessageBuilder
.withPayload(message.getPayload())
.copyHeaders(message.getHeaders())
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("spring.cloud.stream.message.contentType", "application/json")
.build();
return order;
};
}
消费
@Bean
public IntegrationFlow processOrderRequestFlow() {
return IntegrationFlow.from(PROCESS_ORDER_REQUEST_CHANNEL)
.log(Level.TRACE, this.getClass().getName() + ".processOrderRequestFlow")
.enrichHeaders(h -> h.header("source", "processOrderRequestFlow"))
.routeToRecipients(retriesRouter -> retriesRouter
.recipientMessageSelector(RETRIES_CHANNEL,
//payload is byte[] instead Order object
message -> (Order)message.getPayload()
...
感谢您分享您的样品!
不幸的是 Spring Boot
1.5.x
非常非常老,并且很长一段时间都失去支持。我们必须使我们的项目适应我们现在可以升级的任何东西。
所以,我在你的旧项目版本中看到你有这个:
@StreamListener(OrderProcessor.ORDER_REQUEST_INPUT_CHANNEL)
@Output(PROCESS_ORDER_REQUEST_CHANNEL)
public Message<OrderMessage> processOrderRequestListener(final Message<OrderMessage> orderMessage) throws IOException, SQLException {
return orderMessage;
}
本质上,让 Spring Cloud Stream 对输入执行转换,然后将该消息发送到
IntegrationFlow
的通道中。毫不奇怪,您在流程中收到的是 OrderMessage
而不是 byte[]
。
我不确定
private static final String PROCESS_ORDER_REQUEST_CHANNEL = "processOrderRequestListener-out-0";
在新版本中应该如何使用,但我建议根本不要使用中间 Function
,而应该使用类似:
return IntegrationFlow.from(OrderMessageConsumer.class, gateway -> gateway.beanName("processOrderRequestListener"))
...
interface OrderMessageConsumer extends Consumer<OrderMessage> {}