在 Spring 项目中,我使用
Sinks
将事件发送到 SSE 端点,效果很好,检查:
但是当我尝试使用 Smallrye Mutiny
MultiEmitterProcessor
来实现相同的目的时,它失败了。
示例项目是https://github.com/hantsy/quarkus-sandbox/tree/master/jms
MultiEmitterProcessor<Message> emitterProcessor = MultiEmitterProcessor.create();
void receive() {
var consumer = jmsContext.createConsumer(helloQueue);
consumer.setMessageListener(
msg -> {
try {
var received = jsonb.fromJson(msg.getBody(String.class), Message.class);
LOGGER.log(Level.INFO, "consuming message: {0}", received);
emitterProcessor.emit(received);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
);
}
在资源类中,
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> stream() {
// see: https://github.com/quarkusio/quarkus/issues/35220
return handler.emitterProcessor.toMulti().toHotStream();
}
我不确定
MultiEmitterProcessor
当热点流好不好?
或者有类似 Reactor
ConnectableFlux
的东西可以手动连接到热流?
最后,我从 Resource 类中移动以下代码
emitterProcessor.toMulti()
进入处理程序类中的方法,然后它就可以工作了。
public Multi<Message> stream() {
return processor.toMulti().broadcast().toAllSubscribers();
}
检查最终的工作示例:https://github.com/hantsy/quarkus-sandbox/tree/master/jms