以角度实现SSE

问题描述 投票:0回答:2

我正在尝试在我的测试应用程序中实现 SSE。服务器端已设置,我只是使用端点(api/v1/sse/document)。情况是,当我进行扫描时,扫描结果应该出现在我的测试客户端以及我的主应用程序中。问题是,当我刷新页面时,它只出现在我的测试客户端上。我一直在编写一些代码,但我仍然能够自动更新测试客户端中的事件。

这是我写的代码

sse.service.ts

[文档列表.component.ts]

public ngOnInit(): void {
        this.getDocuments();
        this.registerServerSentEvent();
    }

    public ngOnDestroy(): void {
        this.closeServerSentEvent();
    }

    /**
     * getDocuments: call the api service to get the list of documents.
     */
    public getDocuments(): void {
        this.aService.getDocuments().subscribe((documents: Documents[]) => {
            this.documents = documents;
        });
    }

    /**
     * markDocumentAsProcessed: call the api service to mark the document as processed.
     */
    public markDocumentAsProcessed(document: Documents): void {
        this.aService.markDocumentProcessed(document.document.id).subscribe({
            next: () => {
                // Remove the processed document from the list
                this.documents = this.documents.filter((doc) => doc.document.id !== document.document.id);
                this.closeDialog();
            },
            error: (error) => {
                console.log("markDocumentProcessed Error:", error);
                // Handle the error here
            },
        });
    }

    /**
     * showDetails: call the api service to get the document image and display it in a dialog.
     */
    public showDetails(document: Documents): void {
        this.aService.getDocumentImage(document.document.id).subscribe((image: Blob) => {
            const url = window.URL.createObjectURL(image);
            const safeUrl: SafeUrl = this.sanitizer.bypassSecurityTrustUrl(url);
            this.selectedDocument = {...document, imageDataURL: safeUrl};
            this.displayDialog = true;
        });
    }

    /**
     * closeDialog: close the dialog after pressing the process button.
     */
    private closeDialog(): void {
        this.displayDialog = false;
        this.selectedDocument = null;
    }

    private registerServerSentEvent(): void {
        const sseUrl = `${this.aService.config.url}api/v1/sse/document`;
        this.sseSubscription = this.sseService.getServerSentEvent(sseUrl).subscribe((event: MessageEvent) => {
            const documentEvent = JSON.parse(event.data);
            const eventType = documentEvent.type;
            const eventData = documentEvent.data;

            switch (eventType) {
                case "NewDocument": {
                    // Process new document event
                    break;
                }

                case "ViewDocument": {
                    // Process view document event
                    break;
                }

                case "WatchlistStatusUpdate": {
                    // Process watchlist status update event
                    break;
                }

                case "DocumentProcessed": {
                    // Process document processed event
                    const processedDocumentId = eventData.documentId;
                    this.updateProcessedDocument(processedDocumentId);
                    break;
                }
            }
        });
    }

    private updateProcessedDocument(processedDocumentId: string): void {
        // Find the processed document in the documents list
        const processedDocumentIndex = this.documents.findIndex((doc) => doc.document.id === processedDocumentId);
        if (processedDocumentIndex !== -1) {
            // Remove the processed document from the list
            this.documents.splice(processedDocumentIndex, 1);
            // Update any other UI-related logic or perform additional actions as needed
        }
    }

    private closeServerSentEvent(): void {
        if (this.sseSubscription) {
            this.sseSubscription.unsubscribe();
            this.sseService.closeEventSource();
        }
    }
}

这是我获取文件等的服务

[a.service.ts]

public getDocuments(): Observable<Documents[]> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        return this.http
            .get<Documents[]>(`${url}api/v1/document/`, {
                headers: {
                    authorization: this.config.authorization,
                },
            })
            .pipe(
                map((data) => {
                    return data;
                }),
                catchError((error) => {
                    throw error;
                })
            );
    }

    /**
     * markDocumentProcessed: calls the api service to mark the document as processed.
     *
     */
    public markDocumentProcessed(documentId: string): Observable<Documents[]> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        const requestBody = {
            DocumentId: documentId,
        };

        return this.http
            .post<Documents[]>(`${url}api/v1/document/processed`, requestBody, {
                headers: {
                    authorization: this.config.authorization,
                },
            })
            .pipe(
                map((data) => {
                    return data;
                }),
                catchError((error) => {
                    throw error;
                })
            );
    }

    /**
     * getDocumentImage: calls the api service to get the document image.
     *
     */
    public getDocumentImage(documentId: string): Observable<Blob> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        return this.http.get(`${url}api/v1/document/${documentId}/image/Photo`, {
            responseType: "blob",
            headers: {
                authorization: this.config.authorization,
            },
        });
    }

我在控制台中没有收到任何错误,我在端点上收到 200,但我的测试客户端没有自动接收事件,我必须刷新才能看到更改。

angular typescript events event-handling server-sent-events
2个回答
2
投票

问题在于 您正在使用 SSE 端点作为常规 GET 端点。当然,只有在您请求时,您才会获得更新,在您的情况下,通过重新加载页面。

这就是您正在做的事情(取自您的 SSE 服务屏幕截图):

public getServerSentEvent(url: string): Observable<MessageEvent> {
  const token = this.aService.config.authorization;
  // WARNING - SSE doesn't have an API for Headers, this won't work if used as proper SSE
  const headers = new HttpHeaders({
    Authorization: token,
  });

  return new Observable<MessageEvent>((observer) => {
    // HERE - you are not creating the SSE Event Source, just getting from it
    this.http.get(url, {headers, responseType: "text"}).subscribe({
      //...
    })
  })
}

因此,您在 SSE 端点上调用 HTTP GET,然后将其伪装成 SSE 事件。如您所见,这不起作用。

您应该打开一个新的 EventSource,这实际上将按照 SSE 的预期工作:

// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
  // Close event source if current instance of SSE service has some
  if (this.eventSource) {
    this.closeSseConnection();
    this.eventSource = null;
  }
  // Open new channel, create new EventSource - provide your own URL
  this.eventSource = new EventSource(this.yourSSEurl);

  // Process default event
  this.eventSource.onmessage = (event: MessageEvent) => {
    this.zone.run(() => this.processSseEvent(event));
  };

  // Add your own EVENTS handling...
  /*
    enum SSE_EVENTS [
      NewDocument: 'NewDocument',
      //...
    ]
  */
  Object.keys(SSE_EVENTS).forEach(key => {
    this.eventSource.addEventListener(SSE_EVENTS[key], event => {
      this.zone.run(() => this.processSseEvent(event));
    });
  });

  // Process connection opened
  this.eventSource.onopen = () => {
    this.reconnectFrequencySec = 1;
  };

  // Process error
  this.eventSource.onerror = (error: any) => {
    this.reconnectOnError();
  };
}

请参阅我的旧答案:处理 SSE 错误重新连接。在那里你可以找到几乎可用的 SSE 服务以及你需要的一切。


0
投票
import { Injectable, NgZone } from '@angular/core';
import { Observable, ReplaySubject } from 'rxjs';
import {
  EventSourceController,
  EventSourcePlus,
  SseMessage,
} from 'event-source-plus';
import { CompanyService } from './company.service';

@Injectable({
  providedIn: 'root',
})
export class SseService {
  private eventSource: EventSourcePlus | null = null;
  companyId!: string;
  private isConnected = false;
  private controller!: EventSourceController;

  constructor(private zone: NgZone, companyService: CompanyService) {
    companyService.selectedCompany$.subscribe((company) => {
      this.companyId =
        company?.companyProfileId || localStorage.getItem('companyId') || '';
    });
  }

  /**
   * Connects to an SSE endpoint and provides an observable for events.
   * @param url - The URL for the SSE endpoint.
   * @param headers - Optional headers for the request.
   * @returns An Observable emitting MessageEvent data.
   */
  connect(
    url: string,
    headers?: Record<string, string>
  ): Observable<MessageEvent> {
    const subject = new ReplaySubject<MessageEvent>(1); // Buffer the last emitted event

    // Close any existing connection
    this.close();

    // Initialize EventSourcePlus with custom headers if provided
    this.eventSource = new EventSourcePlus(url, {
      retryStrategy: 'always',
      maxRetryCount: 4,
      method: 'get',
      headers: {
        'Content-Type': 'application/json',
        'x-company-id': `${
          this.companyId || localStorage.getItem('companyId') || ''
        }`,
        Authorization: `Bearer ${localStorage.getItem('token')}`,
        refreshToken: `Bearer ${localStorage.getItem('refresh')}`,
      },
    });

    this.isConnected = true;
    this.eventSource.retryInterval = 3000;
    // Listen for messages
    this.controller = this.eventSource.listen({
      onMessage: (res: SseMessage) => {
        try {
          const message = res.data; // Attempt to parse JSON
          this.zone.run(() => {
            subject.next(new MessageEvent('message', { data: message }));
          });
        } catch (error) {
          console.error('Failed to parse message:', res.data, error); // Log the raw data and error
        }
      },

      // Log request errors
      onRequestError: ({ request, error }) => {
        console.error('[Request Error]', request, error);
        this.reconnect(url, headers, subject);
      },

      // Handle response errors (e.g., server-side issues)
      onResponseError: ({ response }) => {
        console.error('[Response Error]', response.status, response.body);
        if (response.status >= 400) {
          this.reconnect(url, headers, subject);
        }
      },
    });

    return subject.asObservable();
  }

    
  close(): void {
    if (this.eventSource) {
      this.controller.abort();  

      this.eventSource = null;
      this.isConnected = false;
      console.log('SSE Connection closed');
    }
  }

  /**
   * Reconnects to the SSE endpoint after a delay.
   * @param url - The URL for the SSE endpoint.
   * @param headers - Optional headers for the request.
   * @param subject - The subject to continue emitting events.
   */
  private reconnect(
    url: string,
    headers: Record<string, string> | undefined,
    subject: ReplaySubject<MessageEvent>
  ): void {
    if (this.isConnected) return; // Prevent duplicate connections

    console.log('Reconnecting to SSE in 1 second...');
    setTimeout(() => {
      this.zone.run(() => {
        this.connect(url, headers).subscribe(subject);
      });
    }, 1000);
  }
}

   // To implement it,

    const url = `https://example.com/v1/notification/subscribe`;
  
    this.sseSubscription = this.sseService.connect(url).subscribe({
      next: (event) => {
        const data = JSON.parse(event.data);
        // TOAST SERVICE
        this.toast.success(data.data, data.timestamp);
      },
      error: (err) => {
        console.error('SSE Error:', err);
      },
    });`
© www.soinside.com 2019 - 2024. All rights reserved.