使用 MappingJackson2MessageConverter 处理已删除的 MQTT 主题

问题描述 投票:0回答:1

在自定义的

IntegrationFlow
中,我使用
MappingJackson2MessageConverter
进行订阅,以从 JSON 生成相应的 POJO。到目前为止,效果非常好。

但是,某些主题会保留并可以删除。目前这会导致异常:

ERROR org.springframework.integration.handler.LoggingHandler [MQTT Call: ...]
    ...
Caused by: org.springframework.integration.transformer.MessageTransformationException: failed to transform message, failedMessage=GenericMessage [payload=byte[0], headers={...}]
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
    ...
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: No content to map due to end-of-input
 at [Source: (byte[])""; line: 1, column: 0], failedMessage=GenericMessage [payload=byte[0], headers={...}]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235)
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
    ...
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: (byte[])""; line: 1, column: 0]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4916)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4818)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3866)
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:221)
    ...

有没有办法拦截这种情况并处理header中删除的topic?我可以轻松忍受得到

null
的结果。但是,由于这对于消息传递来说是不可能的,因为它会终止流程,因此我正在寻找替代解决方案。
routeByException()
是一种选择还是有更好的解决方案?

我的设置看起来或多或少像这样:

    <T> IntegrationFlowRegistration subscriber(
            final MqttConnectionOptions options,
            final String id,
            final String topic,
            final Class<T> type,
            final Class<?> view,
            final GenericHandler<T> handler
    ) {
        final var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id, topic);
        final IntegrationFlowBuilder builder = IntegrationFlow
                .from(adapter)
                .transform(new PojoTransformer<>(type, view))
                .handle(handler);
        return this.flowContext.registration(builder.get()).register();
    }


    public class PojoTransformer<T> extends AbstractTransformer {
        private final Class<T> type;
        private final Class<?> view;


        public PojoTransformer(
                final Class<T> type,
                final Class<?> view
        ) {
            this.type = type;
            this.view = view;
        }


        @Override
        protected Object doTransform(final Message<?> message) {
            // final Object o = message.getPayload();
            // if (o instanceof final byte[] bytes && bytes.length == 0 || o instanceof final String s && s.isBlank()) {
            //  return null;
            // }
            return new MappingJackson2MessageConverter().fromMessage(message, this.type, this.view);
        }
    }
spring-integration mqtt spring-integration-mqtt
1个回答
0
投票

我终于想出了基本上这样的东西:

<T> IntegrationFlowRegistration subscriber(
        final MqttConnectionOptions options,
        final String id,
        final String topic,
        final Class<T> type,
        final Class<?> view,
        final GenericHandler<T> handler,
        final Consumer<MessageHeaders> onDeletion
) {
    final var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id, topic);
    final IntegrationFlowBuilder builder = IntegrationFlow
            .from(adapter)
            .route(byte[].class, payload -> payload.length == 0, spec -> spec
                    .subFlowMapping(true, flow -> flow.handle((p, h) -> {
                        onDeletion.accept(h);
                        return null;
                    }))
                    .defaultOutputToParentFlow()
            )
            .transform(new PojoTransformer<>(type, view))
            .handle(handler);
    return this.flowContext.registration(builder.get()).register();
}

尽管如此,我仍然对其他可能更优雅的解决方案感兴趣。

© www.soinside.com 2019 - 2024. All rights reserved.