我们有一个 spring-cloud-stream 反应函数来处理消息事件,flux 有一个 flatMap 来使用 WebClient 进行外部 http 调用。每次 WebClient 出错时,FluxMessageChannel 的 MessageHandler 都会取消订阅该频道;然后,我们无法再向其发送任何事件,而不会出现有关消息通道有 0 个订阅者的错误。我们怎样才能避免取消订阅我们的处理程序。
这是我的代码:
...
private final WebClient profileServiceClient;
private Function<Message<MessageParticipant>, Mono<ProfileSchema>> getProfile = msg -> {
final String profileId = msg.getPayload().getProfileId();
return profileServiceClient.get()
.uri(uriBuilder -> uriBuilder.path("/profiles/{id}").build(profileId))
.retrieve()
.bodyToMono(ProfileSchema.class);
};
@Bean
public Function<Flux<Message<MessageParticipant>>, Flux<Message<ProfileSchema>>> gatherProfile(ProfileService profileService) {
return messages -> {
return messages.log(log.getName(), Level.FINEST)
.flatMap(getProfile)
.map(p -> MessageBuilder.withPayload(p).build());
};
}
这里是错误信息
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76) ~[spring-integration-core-5.5.12.jar:5.5.12]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.12.jar:5.5.12]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.12.jar:5.5.12]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.20.jar:5.3.20]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.12.jar:5.5.12]
at de.idealo.spring.stream.binder.sqs.inbound.SqsInboundChannelAdapter.access$400(SqsInboundChannelAdapter.java:33) ~[spring-cloud-stream-binder-sqs-1.9.0.jar:na]
at de.idealo.spring.stream.binder.sqs.inbound.SqsInboundChannelAdapter$IntegrationQueueMessageHandler.handleMessageInternal(SqsInboundChannelAdapter.java:162) ~[spring-cloud-stream-binder-sqs-1.9.0.jar:na]
at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:458) ~[spring-messaging-5.3.20.jar:5.3.20]
at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:222) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer$MessageGroupExecutor.run(SimpleMessageListenerContainer.java:426) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
**Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers**
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-5.5.12.jar:5.5.12]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.12.jar:5.5.12]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.12.jar:5.5.12]
... 16 common frames omitted
我们尝试 onStatus 但没有成功
private Function<Message<MessageParticipant>, Mono<ProfileSchema>> getProfile = msg -> {
final String profileId = msg.getPayload().getProfileId();
return profileServiceClient.get()
.uri(uriBuilder -> uriBuilder.path("/profiles/{id}").build(profileId))
.retrieve()
.onStatus(HttpStatus::isError, clientResponse -> {
log.error("Error while getting profile {} with status code {}", profileId, clientResponse.statusCode());
throw new ServiceException("Unable to retrieve profile for profile id : " + profileId);
})
.bodyToMono(ProfileSchema.class);
};