我是新手尝试rxjs和nestjs。我目前正在努力实现的用例是出于教育目的。所以我想使用“fs”模块读取一个json文件(如果文件为空或无法读取则抛出一个可观察的错误)。现在我通过异步读取文件来创建一个observable,在主题中设置观察者,然后在控制器中订阅主题。这是我服务中的代码
@Injectable()
export class NewProviderService {
private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
// this is the variable that should be exposed. make the subject as private
// this allows the service to be the sole propertier to modify the stream and
// not the controller or components
serviceSubject$: Observable<HttpResponseModel[]>;
private serviceErrorSubject: BehaviorSubject<any>;
serviceErrorSubject$: Observable<any>;
filePath: string;
httpResponseObjectArray: HttpResponseModel[];
constructor() {
this.serviceSubject = new BehaviorSubject<HttpResponseModel[]>([]);
this.serviceSubject$ = this.serviceSubject.asObservable();
this.serviceErrorSubject = new BehaviorSubject<any>(null);
this.serviceErrorSubject$ = this.serviceErrorSubject.asObservable();
this.filePath = path.resolve(__dirname, './../../shared/assets/httpTest.json');
}
readFileFromJson() {
return new Promise((resolve, reject) => {
fs.exists(this.filePath.toString(), exists => {
if (exists) {
fs.readFile(this.filePath.toString(), 'utf-8' , (err, data) => {
if (err) {
logger.info('error in reading file', err);
return reject('Error in reading the file' + err.message);
}
logger.info('file read without parsing fg', data.length);
if ((data.length !== 0) && !isNullOrUndefined(data) && data !== null) {
// this.httpResponseObjectArray = JSON.parse(data).HttpTestResponse;
// logger.info('array obj is:', this.httpResponseObjectArray);
logger.info('file read after parsing new', JSON.parse(data));
return resolve(JSON.parse(data).HttpTestResponse);
} else {
return reject(new FileExceptionHandler('no data in file'));
}
});
} else {
return reject(new FileExceptionHandler('file cannot be read at the moment'));
}
});
});
}
getData() {
from(this.readFileFromJson()).pipe(map(data => {
logger.info('data in obs', data);
this.httpResponseObjectArray = data as HttpResponseModel[];
return this.httpResponseObjectArray;
}), catchError(error => {
return Observable.throw(error);
}))
.subscribe(actualData => {
this.serviceSubject.next(actualData);
}, err => {
logger.info('err in sub', typeof err, err);
this.serviceErrorSubject.next(err);
});
}
现在这是控制器类
@Get('/getJsonData')
public async getJsonData(@Req() requestAnimationFrame,@Req() req, @Res() res) {
await this.newService.getData();
this.newService.serviceSubject$.subscribe(data => {
logger.info('data subscribed', data, _.isEmpty(data));
if (!isNullOrUndefined(data) && !_.isEmpty(data)) {
logger.info('coming in');
res.status(HttpStatus.OK).send(data);
res.end();
}
});
}
我面临的问题是我可以第一次获取文件详细信息,订阅被调用一次>其工作正常。在随后的请求
Error [ERR_HTTP_HEADERS_SENT]: Cannot set headers after they are sent to the client
at ServerResponse.setHeader (_http_outgoing.js:470:11)
at ServerResponse.header (C:\personal\Node\test-nest.js\prj-sample\node_modules\express\lib\response.js:767:10)
at Ser
并且endpoint / getJsonData导致错误。有人可以帮助我吗?我相信第一次通话后订阅没有正常,但不知道如何结束,以及如何解决
问题是你在控制器中订阅了serviceSubject
。每次发出新值时,它都会尝试发送响应。这是第一次工作,但第二次它会告诉你它不能再发送相同的响应;请求已经处理完毕。
您可以使用可管道化的first()
运算符在第一个值之后完成Observable:
@Get('/getJsonData')
public async getJsonData() {
await this.newService.getData();
return this.newService.serviceSubject$.pipe(first())
}
您希望共享(热)Observable
,以便每个订阅者始终获得相同的最新值。这正是BehaviourSubject
所做的。因此,当您公开公开它时,不应该将Subject
转换为Observable
,因为您将失去这种期望的行为。相反,你可以将你的Subject
转换为Observable
,这样它在内部仍然是一个主题,但它不会公开next()
方法公开发布新值:
private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
get serviceSubject$(): Observable<HttpResponseModel[]> {
return this.serviceSubject;
}
我认为尝试将冷可观察对象(我创建的对象)转换为热/热可观察对象可能有助于插入单个源并发出并完成其执行并将最后发出的数据维护到任何克隆值。所以我使用publishLast(),refCount()运算符使冷可观察到一个温暖的observable,我可以实现单一订阅和observable的执行完成。以下是我工作的变化。
这是我所做的服务类更改
getData() {
return from(this.readFileFromJson()).pipe(map(data => {
logger.info('data in obs', data);
this.httpResponseObjectArray = data as HttpResponseModel[];
return this.httpResponseObjectArray;
}), publishLast(), refCount()
, catchError(error => {
return Observable.throw(error);
}));
// .subscribe(actualData => {
// this.serviceSubject.next(actualData);
// }, err => {
// logger.info('err in sub', typeof err, err);
// this.serviceErrorSubject.next(err);
// });
}
这是我在控制器中所做的改变
public async getJsonData(@Req() req, @Res() res) {
let jsonData: HttpResponseModel[];
await this.newService.getData().subscribe(data => {
logger.info('dddd', data);
res.send(data);
});
}
任何允许观察者首先订阅主题然后在控制器中订阅该主题的答案也是受欢迎的。
我找到了关于热与冷可观测量的一篇很棒的文章,以及如何制作一个可观察的订阅单一来源并将冷转换为热/温可观察 - https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html
我建议将Promise
直接返回控制器。在这里,你不需要Observable
。对于订阅者,您还会向Promise
发出serviceSubject
的值。
async getData() {
try {
const data = await this.readFileFromJson();
this.serviceSubject.next(data as HttpResponseModel[]);
return data;
} catch (error) {
// handle error
}
}
在您的控制器中,您只需返回Promise
:
@Get('/getJsonData')
public async getJsonData() {
return this.newService.getData();
}