我正在开发一个流 API,我收到的是
'Content-Type': 'text/event-stream'
这是我的stream.service.ts
connectToSse(): Observable<any> {
return new Observable((observer) => {
this.eventSource = new EventSource(`${this.url}/getStreamingData`);
this.eventSource.onmessage = (event) => {
// code to manipulate string data to convert it into [object, object] so that once HTML component receives, it can be shown in browser immediately
observer.next(ArrayObj)
};
this.eventSource.onerror = (error) => {
observer.error(error);
this.closeConnection();
};
});
}
html.component.ts
getStreamData() {
this.sseSubscription = this._stream.connectToSse().subscribe(
(event: any) => {
// I am able to receive newly parsed array object here one by one(or maybe in pair of 2/3 sometimes depending on the chunk sent from server) as array object
this.accumulatedData$ = of(event);
},
(error: any) => {
console.error('SSE error:', error);
}
);
}
HTML 模板
<div class="json-container">
<div *ngFor="let row of accumulatedData$ | async;">
<ng-container *ngFor="let item of row | keyvalue">
<div class="">
{{item.key}} || {{item.value}}
</div>
</ng-container>
<hr />
</div>
</div>
收到所有数组对象后,我就可以在 HTML 中显示数据。但我在这里面临一个问题......
当前代码场景 -
问题 - 我想在 component.ts 文件中收到数据后立即在浏览器中显示数据,但现在我无法做到这一点。我也使用了 ngZone、DetectionStrategies 和其他可观察方法,但没有任何区别到当前的代码场景
我认为你需要一个像
scan
这样的累加器运算符(你可以查看文档hear)
这是文档中的示例
// RxJS v6+
import { of } from 'rxjs';
import { scan } from 'rxjs/operators';
const source = of(1, 2, 3);
// basic scan example, sum over time starting with zero
const example = source.pipe(scan((acc, curr) => acc + curr, 0));
// log accumulated values
// output: 1,3,6
const subscribe = example.subscribe(val => console.log(val));
你可以添加你的xxx功能
connectToSse(): Observable<any> {
return new Observable((observer) => {
this.eventSource = new EventSource(`${this.url}/getStreamingData`);
this.eventSource.onmessage = (event) => {
// code to manipulate string data to convert it into [object, object] so that once HTML component receives, it can be shown in browser immediately
observer.next(ArrayObj)
};
this.eventSource.onerror = (error) => {
observer.error(error);
this.closeConnection();
};
})
.pipe(
scan((acc, curr) => { your implementation })
);
}