我正在尝试接收一些关于 RabbitMQ 队列上正在运行的作业的消息,将它们保存到数据库中,同时,将一些服务器发送的事件发送给等待的客户端以通知正在运行的作业的状态。
事件从
java.util.concurrent.SubmissionPublisher progressPublisher
发送,通过 @Controller
方法接收(其中 JobProgress
是包含状态代码和可能的消息的小记录):
@GetMapping("/{id}/progress/stream")
public Flux<ServerSentEvent<JobProgress>> subscribeToProgress(@PathVariable("id") String id) {
return Flux.merge(JdkFlowAdapter.flowPublisherToFlux(progressPublisher)
.filter(progress -> progress.id().equals(id))
.map(progress -> ServerSentEvent.<JobProgress>builder()
.id(progress.id())
.event("job-progress")
.data(progress)
.retry(Duration.ofSeconds(10))
.build()
),
Flux.interval(Duration.ofMillis(500L))
.map(sequence -> ServerSentEvent.<JobProgress>builder()
.event("keep-alive")
.comment("Keeping the connection alive to avoid abrupt closing.")
.retry(Duration.ofSeconds(1))
.build()))
);
}
发布者的配置:
@Configuration
public class PublisherConfiguration {
@Bean
public SubmissionPublisher<JobProgress> progressSink() {
return new SubmissionPublisher();
}
}
它的用法:
@Component
public class JobEventPublisher {
@Autowired
private final SubmissionPublisher<JobProgress> progressPublisher;
public void dispatchToClient(JobProgress progress) {
progressPublisher.submit(progress);
}
}
我做了一些调试和日志记录,发现事件是通过 flux 发送的(或者至少映射到 ServerSentEvent,还没有用 WireShark 检查过)。
这是接收它的代码(Typescript/React):
useEffect(() => {
const id = "some-uuid-received-on-post";
const source = new EventSource(`${baseUrl}/api/v1/job/${id}/progress/stream`, {withCredentials: true});
// The actual logic is irrelevant, this is never triggered.
source.onmessage = console.log;
return source.close;
}, [base
Url]);
网络选项卡中的事件源永远不会停止挂起,也永远不会收到第一条消息。在客户端和服务器/微服务之间有一个 Spring Boot Cloud Gateway,但我认为这不重要,因为这是标准的 HTTP,而不是 Websocket。
我也尝试过 WebSocket,但也有同样多的错误(不同的错误),而且并不真正需要双向通信,所以这就是我们想要使用的。
我的大部分谷歌搜索都死了,但是重试和保持活动消息是我试图让它工作的一些东西,但都没有。
我也有事件并不总是到达控制器方法的问题,但我认为这是不相关的?如果有比使用 SubmissionPublisher 更好的方法,请告知。我也试过 Sinks.Many,但没有成功。
如果您已经有了可行的解决方案,我会很高兴地接受评论,并提供更好的方法来做到这一点,我已经这样做太久了,真的需要关闭这张票。
我建议分别测试后端和前端。
对于后端,如果它工作正常,你应该能够直接在浏览器中打开你的 SSE url(即
http://localhost:8080/api/v1/job/123/progress/stream
)并看到出现在空白页面上的事件数据。
请注意,您可能还需要在控制器中明确指定生成的内容类型,如下所示:
@GetMapping(path = "...", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
对于前端,您可以将
EventSource
指向一些模拟 SSE 工具的 url,例如 https://sse.dev/test
并查看是否正确接收了事件。