我正在尝试创建 IntegrationFlowAdapter 类的实现,该类允许我获取推送通知,为每个设备令牌复制它,拆分推送通知列表并根据平台将其路由到正确的处理程序,然后获取结果来处理它们(记录它们或可能创建业务逻辑)。子流程处理后流程就停止了,如何继续流程?
这是我的代码:
public class PushSendFlowAdapter extends IntegrationFlowAdapter {
private final String inputChannel;
DeviceTokenManagerService<?> deviceTokenManagerService;
private final FirebaseSender firebaseSender;
private final ApnsSender apnsSender;
private final HuaweiSender huaweiSender;
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return IntegrationFlows.from(this.inputChannel)
.handle((pushNotification, headers) -> deviceTokenManagerService.clonePushNotificationForEachDeviceToken((PushNotification) pushNotification))
.split()
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.headerExpression(TracingConstants.PLATFORM, "payload.getDeviceToken().getPlatform()"))
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(TracingConstants.CORRELATION_ID, UUID.randomUUID()))
.route(Message.class, h -> h.getHeaders().get(TracingConstants.PLATFORM, Platform.class),
routerSpec -> routerSpec
.subFlowMapping(Platform.ANDROID, androidFlow -> androidFlow.handle(PushNotification.class, firebaseSender::sendPush))
.subFlowMapping(Platform.IOS, iosFlow -> iosFlow.handle(PushNotification.class, apnsSender::sendPush))
.subFlowMapping(Platform.HUAWEI, huaweiFlow -> huaweiFlow.handle(PushNotification.class, huaweiSender::sendPush))
)
.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(message -> message.getHeaders().get(TracingConstants.CORRELATION_ID)))
.handle((p,h) -> {
log.info("log: {}", p);
//handling logic
return null;
});
}
}
不清楚您使用的 Spring Integration 版本,但尝试将
.defaultOutputToParentFlow()
添加到该 routerSpec
配置中。
如果知道您的
sendPush
的签名,如果它确实返回了一些内容,那么应该将其归入 aggregate
。