在自定义的
IntegrationFlow
中,我使用 MappingJackson2MessageConverter
进行订阅,以从 JSON 生成相应的 POJO。到目前为止,效果非常好。
但是,某些主题会保留并可以删除。目前这会导致异常:
ERROR org.springframework.integration.handler.LoggingHandler [MQTT Call: ...]
...
Caused by: org.springframework.integration.transformer.MessageTransformationException: failed to transform message, failedMessage=GenericMessage [payload=byte[0], headers={...}]
at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
...
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: No content to map due to end-of-input
at [Source: (byte[])""; line: 1, column: 0], failedMessage=GenericMessage [payload=byte[0], headers={...}]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
...
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
at [Source: (byte[])""; line: 1, column: 0]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4916)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4818)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3866)
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:221)
...
有没有办法拦截这种情况并处理header中删除的topic?我可以轻松忍受得到
null
的结果。但是,由于这对于消息传递来说是不可能的,因为它会终止流程,因此我正在寻找替代解决方案。 routeByException()
是一种选择还是有更好的解决方案?
我的设置看起来或多或少像这样:
<T> IntegrationFlowRegistration subscriber(
final MqttConnectionOptions options,
final String id,
final String topic,
final Class<T> type,
final Class<?> view,
final GenericHandler<T> handler
) {
final var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id, topic);
final IntegrationFlowBuilder builder = IntegrationFlow
.from(adapter)
.transform(new PojoTransformer<>(type, view))
.handle(handler);
return this.flowContext.registration(builder.get()).register();
}
public class PojoTransformer<T> extends AbstractTransformer {
private final Class<T> type;
private final Class<?> view;
public PojoTransformer(
final Class<T> type,
final Class<?> view
) {
this.type = type;
this.view = view;
}
@Override
protected Object doTransform(final Message<?> message) {
// final Object o = message.getPayload();
// if (o instanceof final byte[] bytes && bytes.length == 0 || o instanceof final String s && s.isBlank()) {
// return null;
// }
return new MappingJackson2MessageConverter().fromMessage(message, this.type, this.view);
}
}
我终于想出了基本上这样的东西:
<T> IntegrationFlowRegistration subscriber(
final MqttConnectionOptions options,
final String id,
final String topic,
final Class<T> type,
final Class<?> view,
final GenericHandler<T> handler,
final Consumer<MessageHeaders> onDeletion
) {
final var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id, topic);
final IntegrationFlowBuilder builder = IntegrationFlow
.from(adapter)
.route(byte[].class, payload -> payload.length == 0, spec -> spec
.subFlowMapping(true, flow -> flow.handle((p, h) -> {
onDeletion.accept(h);
return null;
}))
.defaultOutputToParentFlow()
)
.transform(new PojoTransformer<>(type, view))
.handle(handler);
return this.flowContext.registration(builder.get()).register();
}
尽管如此,我仍然对其他可能更优雅的解决方案感兴趣。