我正在尝试在我的测试应用程序中实现 SSE。服务器端已设置,我只是使用端点(api/v1/sse/document)。情况是,当我进行扫描时,扫描结果应该出现在我的测试客户端以及我的主应用程序中。问题是,当我刷新页面时,它只出现在我的测试客户端上。我一直在编写一些代码,但我仍然能够自动更新测试客户端中的事件。
这是我写的代码
[文档列表.component.ts]
public ngOnInit(): void {
this.getDocuments();
this.registerServerSentEvent();
}
public ngOnDestroy(): void {
this.closeServerSentEvent();
}
/**
* getDocuments: call the api service to get the list of documents.
*/
public getDocuments(): void {
this.aService.getDocuments().subscribe((documents: Documents[]) => {
this.documents = documents;
});
}
/**
* markDocumentAsProcessed: call the api service to mark the document as processed.
*/
public markDocumentAsProcessed(document: Documents): void {
this.aService.markDocumentProcessed(document.document.id).subscribe({
next: () => {
// Remove the processed document from the list
this.documents = this.documents.filter((doc) => doc.document.id !== document.document.id);
this.closeDialog();
},
error: (error) => {
console.log("markDocumentProcessed Error:", error);
// Handle the error here
},
});
}
/**
* showDetails: call the api service to get the document image and display it in a dialog.
*/
public showDetails(document: Documents): void {
this.aService.getDocumentImage(document.document.id).subscribe((image: Blob) => {
const url = window.URL.createObjectURL(image);
const safeUrl: SafeUrl = this.sanitizer.bypassSecurityTrustUrl(url);
this.selectedDocument = {...document, imageDataURL: safeUrl};
this.displayDialog = true;
});
}
/**
* closeDialog: close the dialog after pressing the process button.
*/
private closeDialog(): void {
this.displayDialog = false;
this.selectedDocument = null;
}
private registerServerSentEvent(): void {
const sseUrl = `${this.aService.config.url}api/v1/sse/document`;
this.sseSubscription = this.sseService.getServerSentEvent(sseUrl).subscribe((event: MessageEvent) => {
const documentEvent = JSON.parse(event.data);
const eventType = documentEvent.type;
const eventData = documentEvent.data;
switch (eventType) {
case "NewDocument": {
// Process new document event
break;
}
case "ViewDocument": {
// Process view document event
break;
}
case "WatchlistStatusUpdate": {
// Process watchlist status update event
break;
}
case "DocumentProcessed": {
// Process document processed event
const processedDocumentId = eventData.documentId;
this.updateProcessedDocument(processedDocumentId);
break;
}
}
});
}
private updateProcessedDocument(processedDocumentId: string): void {
// Find the processed document in the documents list
const processedDocumentIndex = this.documents.findIndex((doc) => doc.document.id === processedDocumentId);
if (processedDocumentIndex !== -1) {
// Remove the processed document from the list
this.documents.splice(processedDocumentIndex, 1);
// Update any other UI-related logic or perform additional actions as needed
}
}
private closeServerSentEvent(): void {
if (this.sseSubscription) {
this.sseSubscription.unsubscribe();
this.sseService.closeEventSource();
}
}
}
这是我获取文件等的服务
[a.service.ts]
public getDocuments(): Observable<Documents[]> {
let url = this.config.url;
if (!url.endsWith("/")) {
url += "/";
}
return this.http
.get<Documents[]>(`${url}api/v1/document/`, {
headers: {
authorization: this.config.authorization,
},
})
.pipe(
map((data) => {
return data;
}),
catchError((error) => {
throw error;
})
);
}
/**
* markDocumentProcessed: calls the api service to mark the document as processed.
*
*/
public markDocumentProcessed(documentId: string): Observable<Documents[]> {
let url = this.config.url;
if (!url.endsWith("/")) {
url += "/";
}
const requestBody = {
DocumentId: documentId,
};
return this.http
.post<Documents[]>(`${url}api/v1/document/processed`, requestBody, {
headers: {
authorization: this.config.authorization,
},
})
.pipe(
map((data) => {
return data;
}),
catchError((error) => {
throw error;
})
);
}
/**
* getDocumentImage: calls the api service to get the document image.
*
*/
public getDocumentImage(documentId: string): Observable<Blob> {
let url = this.config.url;
if (!url.endsWith("/")) {
url += "/";
}
return this.http.get(`${url}api/v1/document/${documentId}/image/Photo`, {
responseType: "blob",
headers: {
authorization: this.config.authorization,
},
});
}
我在控制台中没有收到任何错误,我在端点上收到 200,但我的测试客户端没有自动接收事件,我必须刷新才能看到更改。
问题在于 您正在使用 SSE 端点作为常规 GET 端点。当然,只有在您请求时,您才会获得更新,在您的情况下,通过重新加载页面。
这就是您正在做的事情(取自您的 SSE 服务屏幕截图):
public getServerSentEvent(url: string): Observable<MessageEvent> {
const token = this.aService.config.authorization;
// WARNING - SSE doesn't have an API for Headers, this won't work if used as proper SSE
const headers = new HttpHeaders({
Authorization: token,
});
return new Observable<MessageEvent>((observer) => {
// HERE - you are not creating the SSE Event Source, just getting from it
this.http.get(url, {headers, responseType: "text"}).subscribe({
//...
})
})
}
因此,您在 SSE 端点上调用 HTTP GET,然后将其伪装成 SSE 事件。如您所见,这不起作用。
您应该打开一个新的 EventSource,这实际上将按照 SSE 的预期工作:
// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
// Close event source if current instance of SSE service has some
if (this.eventSource) {
this.closeSseConnection();
this.eventSource = null;
}
// Open new channel, create new EventSource - provide your own URL
this.eventSource = new EventSource(this.yourSSEurl);
// Process default event
this.eventSource.onmessage = (event: MessageEvent) => {
this.zone.run(() => this.processSseEvent(event));
};
// Add your own EVENTS handling...
/*
enum SSE_EVENTS [
NewDocument: 'NewDocument',
//...
]
*/
Object.keys(SSE_EVENTS).forEach(key => {
this.eventSource.addEventListener(SSE_EVENTS[key], event => {
this.zone.run(() => this.processSseEvent(event));
});
});
// Process connection opened
this.eventSource.onopen = () => {
this.reconnectFrequencySec = 1;
};
// Process error
this.eventSource.onerror = (error: any) => {
this.reconnectOnError();
};
}
请参阅我的旧答案:处理 SSE 错误重新连接。在那里你可以找到几乎可用的 SSE 服务以及你需要的一切。
import { Injectable, NgZone } from '@angular/core';
import { Observable, ReplaySubject } from 'rxjs';
import {
EventSourceController,
EventSourcePlus,
SseMessage,
} from 'event-source-plus';
import { CompanyService } from './company.service';
@Injectable({
providedIn: 'root',
})
export class SseService {
private eventSource: EventSourcePlus | null = null;
companyId!: string;
private isConnected = false;
private controller!: EventSourceController;
constructor(private zone: NgZone, companyService: CompanyService) {
companyService.selectedCompany$.subscribe((company) => {
this.companyId =
company?.companyProfileId || localStorage.getItem('companyId') || '';
});
}
/**
* Connects to an SSE endpoint and provides an observable for events.
* @param url - The URL for the SSE endpoint.
* @param headers - Optional headers for the request.
* @returns An Observable emitting MessageEvent data.
*/
connect(
url: string,
headers?: Record<string, string>
): Observable<MessageEvent> {
const subject = new ReplaySubject<MessageEvent>(1); // Buffer the last emitted event
// Close any existing connection
this.close();
// Initialize EventSourcePlus with custom headers if provided
this.eventSource = new EventSourcePlus(url, {
retryStrategy: 'always',
maxRetryCount: 4,
method: 'get',
headers: {
'Content-Type': 'application/json',
'x-company-id': `${
this.companyId || localStorage.getItem('companyId') || ''
}`,
Authorization: `Bearer ${localStorage.getItem('token')}`,
refreshToken: `Bearer ${localStorage.getItem('refresh')}`,
},
});
this.isConnected = true;
this.eventSource.retryInterval = 3000;
// Listen for messages
this.controller = this.eventSource.listen({
onMessage: (res: SseMessage) => {
try {
const message = res.data; // Attempt to parse JSON
this.zone.run(() => {
subject.next(new MessageEvent('message', { data: message }));
});
} catch (error) {
console.error('Failed to parse message:', res.data, error); // Log the raw data and error
}
},
// Log request errors
onRequestError: ({ request, error }) => {
console.error('[Request Error]', request, error);
this.reconnect(url, headers, subject);
},
// Handle response errors (e.g., server-side issues)
onResponseError: ({ response }) => {
console.error('[Response Error]', response.status, response.body);
if (response.status >= 400) {
this.reconnect(url, headers, subject);
}
},
});
return subject.asObservable();
}
close(): void {
if (this.eventSource) {
this.controller.abort();
this.eventSource = null;
this.isConnected = false;
console.log('SSE Connection closed');
}
}
/**
* Reconnects to the SSE endpoint after a delay.
* @param url - The URL for the SSE endpoint.
* @param headers - Optional headers for the request.
* @param subject - The subject to continue emitting events.
*/
private reconnect(
url: string,
headers: Record<string, string> | undefined,
subject: ReplaySubject<MessageEvent>
): void {
if (this.isConnected) return; // Prevent duplicate connections
console.log('Reconnecting to SSE in 1 second...');
setTimeout(() => {
this.zone.run(() => {
this.connect(url, headers).subscribe(subject);
});
}, 1000);
}
}
// To implement it,
const url = `https://example.com/v1/notification/subscribe`;
this.sseSubscription = this.sseService.connect(url).subscribe({
next: (event) => {
const data = JSON.parse(event.data);
// TOAST SERVICE
this.toast.success(data.data, data.timestamp);
},
error: (err) => {
console.error('SSE Error:', err);
},
});`