Spring 集成负载转换异常

问题描述 投票:0回答:1

我将 spring-integration-java-dsl 1.2.3 升级到 spring-integration-core 6.3.4 之后我面临数据类型问题,每当我尝试到达消息有效负载时,这只是转换一个byte[] 而不是 Order 对象,自然地,我得到了强制转换异常。我怀疑是否缺少配置参数等,因为我可以使用旧版本的依赖项直接获取 Order 对象

这是简化的演示项目:repo

application.yml 文件中的云配置

  cloud:
    stream:
      default:
        content-type: application/json
      defaultBinder: rabbit
      function:
        definition: processOrderRequestListener;                        
      bindings:                         
        processOrderRequestListener-in-0:  
          destination: orderQueue 
          group: OrderRequest 
          content-type: application/json
          consumer:
            concurrency: 10
            max-attempts: 1
        processOrderRequestListener-out-0:
          destination: processOrderRequestChannel 
          content-type: application/json

出版

@Bean
public Function<Message<Order>, Message<Order>> processOrderRequestListener() {
    return message -> {
        Message<Order> order = MessageBuilder
                .withPayload(message.getPayload())
                .copyHeaders(message.getHeaders())
                .setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
                .setHeader("spring.cloud.stream.message.contentType", "application/json")
                .build();
        return order; 
    };
}

消费

    @Bean
public IntegrationFlow processOrderRequestFlow() {
    return IntegrationFlow.from(PROCESS_ORDER_REQUEST_CHANNEL)
            .log(Level.TRACE, this.getClass().getName() + ".processOrderRequestFlow")
            .enrichHeaders(h -> h.header("source", "processOrderRequestFlow"))
            .routeToRecipients(retriesRouter -> retriesRouter
                    .recipientMessageSelector(RETRIES_CHANNEL,
                            //payload is byte[] instead Order object
                            message ->  (Order)message.getPayload()
                            ...
spring spring-integration spring-dsl
1个回答
0
投票

感谢您分享您的样品!

不幸的是 Spring Boot

1.5.x
非常非常老,并且很长一段时间都失去支持。我们必须使我们的项目适应我们现在可以升级的任何东西。

所以,我在你的旧项目版本中看到你有这个:

@StreamListener(OrderProcessor.ORDER_REQUEST_INPUT_CHANNEL)
@Output(PROCESS_ORDER_REQUEST_CHANNEL)
public Message<OrderMessage> processOrderRequestListener(final Message<OrderMessage> orderMessage) throws IOException, SQLException {
    return orderMessage;
}

本质上,让 Spring Cloud Stream 对输入执行转换,然后将该消息发送到

IntegrationFlow
的通道中。毫不奇怪,您在流程中收到的是
OrderMessage
而不是
byte[]

我不确定

 private static final String PROCESS_ORDER_REQUEST_CHANNEL = "processOrderRequestListener-out-0";
在新版本中应该如何使用,但我建议根本不要使用中间
Function
,而应该使用类似:

 return IntegrationFlow.from(OrderMessageConsumer.class, gateway -> gateway.beanName("processOrderRequestListener"))

 ...
 interface OrderMessageConsumer extends Consumer<OrderMessage> {}
© www.soinside.com 2019 - 2024. All rights reserved.