我正在尝试接收一些关于 RabbitMQ 队列上正在运行的作业的消息,将它们保存到数据库中,同时,将一些服务器发送的事件发送给等待的客户端以通知正在运行的作业的状态。
事件从
java.util.concurrent.SubmissionPublisher progressPublisher
发送,通过 @Controller
方法接收(其中 JobProgress
是包含状态代码和可能的消息的小记录):
@GetMapping("/{id}/progress/stream")
public Flux<ServerSentEvent<JobProgress>> subscribeToProgress(@PathVariable("id") String id) {
return Flux.merge(JdkFlowAdapter.flowPublisherToFlux(progressPublisher)
.filter(progress -> progress.id().equals(id))
.map(progress -> ServerSentEvent.<JobProgress>builder()
.id(progress.id())
.event("job-progress")
.data(progress)
.retry(Duration.ofSeconds(10))
.build()
),
Flux.interval(Duration.ofMillis(500L))
.map(sequence -> ServerSentEvent.<JobProgress>builder()
.event("keep-alive")
.comment("Keeping the connection alive to avoid abrupt closing.")
.retry(Duration.ofSeconds(1))
.build()))
);
}
发布者的配置:
@Configuration
public class PublisherConfiguration {
@Bean
public SubmissionPublisher<JobProgress> progressSink() {
return new SubmissionPublisher();
}
}
它的用法:
@Component
public class JobEventPublisher {
@Autowired
private final SubmissionPublisher<JobProgress> progressPublisher;
public void dispatchToClient(JobProgress progress) {
progressPublisher.submit(progress);
}
}
我做了一些调试和日志记录,发现事件是通过 flux 发送的(或者至少映射到 ServerSentEvent,还没有用 WireShark 检查过)。
这是接收它的代码(Typescript/React):
useEffect(() => {
const id = "some-uuid-received-on-post";
const source = new EventSource(`${baseUrl}/api/v1/job/${id}/progress/stream`, {withCredentials: true});
// The actual logic is irrelevant, this is never triggered.
source.onmessage = console.log;
return source.close;
}, [baseUrl]);
网络选项卡中的事件源永远不会停止挂起,也永远不会收到第一条消息。在客户端和服务器/微服务之间有一个 Spring Boot Cloud Gateway,但我认为这不重要,因为这是标准的 HTTP,而不是 Websocket。
我也尝试过 WebSocket,但也有同样多的错误(不同的错误),而且并不真正需要双向通信,所以这就是我们想要使用的。
我的大部分谷歌搜索都死了,但是重试和保持活动消息是我试图让它工作的一些东西,但都没有。
我也有事件并不总是到达控制器方法的问题,但我认为这是不相关的?如果有比使用 SubmissionPublisher 更好的方法,请告知。我也试过 Sinks.Many,但没有成功。
如果您已经有了可行的解决方案,我会很高兴地接受评论,并提供更好的方法来做到这一点,我已经这样做太久了,真的需要关闭这张票。