我正在尝试通过应用程序前端的 EventSource 订阅 BE 上的事件。问题似乎反映了事件侦听器,因为我正在到达 BE 端点并注册订阅者。
我在 FE 上使用 React Typescript,在 BE 上使用 Java 17 和 Spring Boot 以及 RedisCache 来保存和恢复订阅的用户,因此如果 pod 关闭或我们需要部署某个功能,我们可以恢复订阅者。
这是我订阅 BE 事件的 FE 函数,在组件的第一个 useEffect 上调用
/**
* This function registers and receives BE events to update data
* similar to a webSocket, but only unidirectional from BE to FE
* For now, just UNANSWERED_BUCKET is used, but in the future
* can be used for other event types
* @returns
*/
function registerUnansweredSSeEvent() {
const sourceUnanswered = SseEventService.subscribeToEventsEventSource(
context?.myExternalId,
SseSubscriptionEvents.UNANSWERED_BUCKET,
);
sourceUnanswered.addEventListener(
SseSubscriptionEvents.UNANSWERED_BUCKET,
(event) => {
fillUnansweredAfterCategorization(event);
},
);
sourceUnanswered.onopen = (event: any) => logger.info("Connection opened");
sourceUnanswered.onerror = (event: any) => logger.info("Connection error");
return sourceUnanswered;
}
这是函数 susbscribeToEventsEventSurce:
subscribeToEventsEventSource(
externalId: string | undefined,
eventToSubscribe: string,
) {
return new EventSource(
`${this.baseUrl}/subscribe?personalExternalId=${externalId}&eventToSubscribe=${eventToSubscribe}`,
);
}
我的控制器:
@GetMapping(value = "/subscribe", produces = "text/event-stream")
public SseEmitter subscribeSseEmitter(
@RequestParam("personalExternalId") String personalExternalId,
@RequestParam("eventToSubscribe") String eventToSubscribe){
return emitterService.registerClient(personalExternalId, eventToSubscribe);
}
我的完整服务
@Service
public class SSEService {
private final NewRelicLogger logger = NewRelicLogger.getLogger(SSEService.class);
private static final AtomicInteger ID_COUNTER = new AtomicInteger(1);
//2hs maximum time Timeout. Should we increase this?
public static final long DEFAULT_TIMEOUT = 7200000L;
private final SlackNotificationService slackNotificationService;
private final CacheService cacheService;
public SSEService(SlackNotificationService slackNotificationService, CacheService cacheService) {
this.slackNotificationService = slackNotificationService;
this.cacheService = cacheService;
}
/**
* @param subscriberExternalId: Email of the registered user to subscribe to notifications
* @return This method subscribes the user to the events sent from the BE
*/
@AutomaticLogging
public SseEmitter registerClient(String subscriberExternalId, String eventToSubscribe) {
var emitter = new SseEmitter(DEFAULT_TIMEOUT);
var sseClient = new SseClient(subscriberExternalId != null ? subscriberExternalId : UUID.randomUUID().toString(), emitter);
addOrUpdateSseClient(sseClient, SseEventType.valueOf(eventToSubscribe));
emitter.onCompletion(() -> removeFromCache(subscriberExternalId));
emitter.onError(err -> removeAndLogError(sseClient, err.getMessage()));
emitter.onTimeout(() -> removeAndLogError(sseClient, "TIMEOUT"));
logger.info("New client registered {}", sseClient.getExternalId());
slackNotificationService.logInfo("New client registered " + sseClient.getExternalId());
return emitter;
}
private void removeFromCache(String subscriberExternalId){
try {
cacheService.removeCacheKey(SSE_SUBSCRIBER, subscriberExternalId);
} catch (Exception e) {
logger.warn("Couldn't remove cache for subscriber: " + subscriberExternalId);
}
}
@AutomaticLogging
public void unregisterClient(String subscriberExternalId) {
removeFromCache(subscriberExternalId);
}
/**
* @param newClient Here we add a new client removing the previous one
* to not keep open connections if the user reloads the page
*/
public void addOrUpdateSseClient(SseClient newClient, SseEventType newEventType) {
String key = newClient.getExternalId();
SseClient registeredClientsObject = cacheService.getSSEClientFromCache(SSE_SUBSCRIBER, key);
newClient.getSubscribedEvents().add(newEventType);
if (registeredClientsObject == null ){
cacheService.putValueInCache(SSE_SUBSCRIBER, key, newClient);
} else {
if (!registeredClientsObject.getSubscribedEvents().contains(newEventType)){
List<SseEventType> events = registeredClientsObject.getSubscribedEvents();
events.add(newEventType);
newClient.setSubscribedEvents(events);
cacheService.putValueInCache(SSE_SUBSCRIBER, key, newClient);
}
}
}
/**
* @param eventType The type of the event sent to the subscribers
* So only subscribed to this eventType receives the notifications
* Method used to Broadcast the messages to all subscribers
* Use this for generic updates
*/
public void broadcastSseEmitterMessages(SseEventType eventType) {
List<SseClient> clients = cacheService.getAllSSEClientFromCache()
.stream()
.filter(client -> client.getSubscribedEvents().contains(eventType))
.toList();
for (SseClient client : clients) {
sendBroadcasterEmitterMessage(client, eventType);
}
}
/**
* @param client Subscriber who will receive the event
* @param event event to notify
*/
@AutomaticLogging
private void sendBroadcasterEmitterMessage(SseClient client, SseEventType event) {
var sseEmitter = client.getSseEmitter();
try {
logger.info("Notify client {}", client.getExternalId());
var eventId = ID_COUNTER.incrementAndGet();
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name(event.name())
.id(String.valueOf(eventId))
.data(event, MediaType.APPLICATION_JSON);
sseEmitter.send(eventBuilder);
} catch (IOException e) {
sseEmitter.completeWithError(e);
}
}
private void removeAndLogError(SseClient client, String error) {
logger.error("Error during communication. Unregister client {}, error {}", client.getExternalId(), error);
slackNotificationService.logError("Error during communication. Unregister client " + client.getExternalId() + " with error: " + error);
removeFromCache(client.getExternalId());
}
public void broadcastSseEmitterMessagesChatBuckets(List<SmallChannelDTO> channels, SseEventType eventType) {
List<SseClient> clients = cacheService.getAllSSEClientFromCache();
for (SseClient client: clients) {
sendBroadcastSseEmitterMessagesChatBuckets(client, channels, eventType);
}
}
/**
* @param client Subscriber who will receive the event for unanswered bucket on chat
* @param channels the channels to update
* @param eventType event to notify
*/
@AutomaticLogging
private void sendBroadcastSseEmitterMessagesChatBuckets(SseClient client, List<SmallChannelDTO> channels, SseEventType eventType) {
var sseEmitter = client.getSseEmitter();
try {
logger.info("Notify client {} - sseEmitter {} - the event {}", client.getExternalId(), sseEmitter, eventType.name());
var eventId = ID_COUNTER.incrementAndGet();
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name(eventType.name())
.id(String.valueOf(eventId))
.data(channels, MediaType.APPLICATION_JSON);
sseEmitter.send(eventBuilder);
} catch (IOException e) {
sseEmitter.completeWithError(e);
}
}
}
以及保存并返回缓存信息的代码:
public SseClient getSSEClientFromCache(String cacheName, String key) {
SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(cacheName);
if (cache != null) {
Cache.ValueWrapper valueWrapper = cache.get(key);
if (valueWrapper != null) {
return (SseClient) valueWrapper.get();
}
}
return null;
}
public List<SseClient> getAllSSEClientFromCache() {
SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(SSE_SUBSCRIBER);
if (cache != null) {
List<String> allKeys = cache.getAllKeys().stream()
.filter(c -> c.startsWith(SSE_SUBSCRIBER))
.map(c -> c.substring(SSE_SUBSCRIBER.length() + 1)) //we cut the key so only get the externalID
.toList();
List<SseClient> clients = new ArrayList<>();
for (String key : allKeys) {
SseClient client = this.getSSEClientFromCache(SSE_SUBSCRIBER, key);
if (client != null) {
clients.add(client);
}
}
return clients;
}
return Collections.emptyList();
}
public void putValueInCache(String cacheName, Object key, Object value) {
SpringRedisCache cache = (SpringRedisCache) cacheManager.getCache(cacheName);
if (cache != null) {
cache.put(key, value);
}
}
当我需要触发事件时,它可以工作,但 FE 没有收到它。
有什么帮助吗?
我找到了解决方案并设法使其发挥作用!
我已经将我的控制器更改为如下所示并且它有效,但在我添加此之前还没有完全发挥作用
cacheControl(CacheControl.noCache())
@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribeSseEmitter(
@RequestParam("personalExternalId") String personalExternalId,
@RequestParam("eventToSubscribe") String eventToSubscribe) {
return ResponseEntity
.ok()
.header("Connection", "keep-alive")
.cacheControl(CacheControl.noCache())
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(emitterService.registerClientToSet(personalExternalId, eventToSubscribe));
}
但现在我面临另一个问题 如果我想重新连接客户端,超时会发生什么? 似乎 Fe 一直在尝试重新连接,而服务器表示显示超时。我认为设置默认超时而不是仅仅放置 Long.maxValue 或类似的东西是一个很好的做法。