我想从EventSource(服务器发送事件)创建一个RxJs Observable。
我尝试了以下方法:
import {Component, OnInit} from 'angular2/core';
import {Subject, Observable} from 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
someStrings:string[] = [];
ngOnInit() {
let eventSource = new EventSource('/interval-sse-observable');
let observable = Observable.create(eventSource);
observable.subscribe({
next: aString => this.someStrings.push(aString.data),
error: err => console.error('something wrong occurred: ' + err)
});
}
}
但是我遇到以下异常:
EXCEPTION: Error: Uncaught (in promise): EXCEPTION: TypeError: this._subscribe is not a function in [null]
ORIGINAL EXCEPTION: TypeError: this._subscribe is not a function
ORIGINAL STACKTRACE:
TypeError: this._subscribe is not a function
at Observable.subscribe (https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js:11210:29)
at AppComponent.ngOnInit (http://localhost:8080/scripts/app.component.ts!transpiled:30:28)
at AbstractChangeDetector.ChangeDetector_HostAppComponent_0.detectChangesInRecordsInternal (viewFactory_HostAppComponent:21:99)
at AbstractChangeDetector.detectChangesInRecords (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9689:14)
at AbstractChangeDetector.runDetectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9672:12)
at AbstractChangeDetector.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9661:12)
at ChangeDetectorRef_.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:5280:16)
at https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13048:27
at Array.forEach (native)
at ApplicationRef_.tick (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13047:34)
为了完整起见,这是我的index.html的内容:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>sse demo</title>
<!-- 1. Load libraries -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/angular2-polyfills.js"></script>
<script src="https://code.angularjs.org/tools/system.js"></script>
<script src="https://code.angularjs.org/tools/typescript.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js"></script>
<script src="https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/http.dev.js"></script>
<!-- 2. Configure SystemJS -->
<script>
System.config({
transpiler: 'typescript',
typescriptOptions: { emitDecoratorMetadata: true }
});
System.import('./scripts/app.ts')
.then(null, console.error.bind(console));
</script>
</head>
<body>
<my-app>Loading...</my-app>
</body>
</html>
有人可以帮忙吗?
编辑1:按照Yurzui的建议,我修改了我的代码如下:
ngOnInit() {
let observable = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(console.log(x));
eventSource.onerror = x => observer.error(console.log('EventSource failed'));
return () => {
eventSource.close();
};
});
observable.subscribe({
next: aString => this.someStrings.push(aString.data),
error: err => console.error('something wrong occurred: ' + err)
});
}
它确实在控制台中记录了第一条消息,如下所示:
MessageEvent {isTrusted: true, data: "c374a15b-b37d-498e-8ab0-49643b79c1bb", origin: "http://localhost:8080", lastEventId: "", source: null…}bubbles: falsecancelBubble: falsecancelable: falsecurrentTarget: EventSourcedata: "c374a15b-b37d-498e-8ab0-49643b79c1bb"defaultPrevented: falseeventPhase: 0isTrusted: trueisTrusted: truelastEventId: ""origin: "http://localhost:8080"path: Array[0]ports: nullreturnValue: truesource: nullsrcElement: EventSourcetarget: EventSourcetimeStamp: 6257.125type: "message"__proto__: MessageEvent
Rx.js:10982 Uncaught TypeError: Cannot read property 'data' of undefinedSystem.register.exports_1.execute.AppComponent.ngOnInit.observable.subscribe.next @ app.component.ts:29SafeSubscriber.__tryOrUnsub @ Rx.js:10979SafeSubscriber.next @ Rx.js:10934Subscriber._next @ Rx.js:10894Subscriber.next @ Rx.js:10871System.register.exports_1.execute.AppComponent.ngOnInit.Rx_1.Observable.create.eventSource.onmessage @ app.component.ts:21
现在,如果不是在控制台中记录
x
变量,我只需将其传递给下一个方法,如下所示:
eventSource.onmessage = x => observer.next(x);
服务器发送的事件由客户端检索(我在 chrome 开发工具中看到它们),但模板中没有显示任何内容,表明字符串数组未填充...
顺便说一下,我必须删除
JSON.parse(x.data)
,因为它导致了错误。
您可以使用以下代码为EventSource流手动创建Observable:
export class AppComponent implements OnInit {
someStrings:string[] = [];
constructor(private zone: NgZone) {}
ngOnInit(){
const observable = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(x.data);
eventSource.onerror = x => observer.error(x);
return () => {
eventSource.close();
};
});
this.subscription = observable.subscribe({
next: guid => {
this.zone.run(() => this.someStrings.push(guid));
},
error: err => console.error('something wrong occurred: ' + err)
});
}
}
// somewhere
// this.subscription.unsubscribe()
不要忘记导入 NgZone 类:
import {Component, OnInit, NgZone} from '@angular/core';
我应该完成 Yurzui 的回答:
就我而言,在使用 Angular 6 将函数分配给
onmessage
时,我遇到了一些奇怪的行为。因此,我添加了一个事件侦听器,它的工作方式就像一个魅力:
const observable = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.addEventListener("message", (event: MessageEvent) => observer.next(event.data));
eventSource.addEventListener("error", (event: MessageEvent) => observer.error(event));
return () => {
eventSource.close();
};
});
这会将 EventSource 转换为可观察对象:
new Observable((observer) => {
const eventSource = new EventSource(`sseEndpoint`);
eventSource.onerror = (error) => {
observer.error(error);
eventSource.close();
};
eventSource.onmessage = ({ data }) => {
observer.next(data);
};
return eventSource.close();
});