订阅具有异步功能的observable是否安全

问题描述 投票:0回答:2

我有一个事件发射器,以50Hz发送事件。我想用异步方法订阅这个发射器。代码如下所示:

this.emitter = fromEventPattern(this.addHandler, this.removeHandler, (err, char) => [err, char]);
this.rxSubscription = this.emitter.subscribe(this.handleUpdatedValuesComingFromSensor);

       handleUpdatedValuesComingFromSensor = async (arr: any[]): Promise<void> => {
   ...
   await someMethodAsync();
   ...
}

我可能错了,但我的印象是等待在那里使发射器立即调用onNext(),因为我已退出该方法。

由于事件发生率,使用控制台调用很难调试。

我是对还是错?

谢谢你的帮助。

编辑1:

我正在使用打字机目标ES2015,因此为异步/等待生成状态机。

如果我是对的,我怎样才能确保呼叫不重叠?我需要计算收到的值的平均值。

typescript async-await rxjs
2个回答
0
投票

在那里等待使发射器立即调用onNext(),因为我已退出该方法

你是对的。 Rx忽略了它的订阅函数的返回类型,因此它忽略了async函数在遇到它的第一个await时返回的promise。这意味着:

  1. 只要另一个项目到达observable,Rx将再次调用您的订阅功能。它忽略了返回的promise,因此它不知道旧的调用仍在进行中。
  2. 你的async函数的例外将被忽略,因为承诺被忽略了。一些promise库有一个全局的“unobserved promise error”事件可以处理这个问题。

0
投票

我不太清楚你的顾虑是什么。它是有效的,您的方法将为每个元素调用一次。它不会跳过任何东西或者在中途切断你的方法但是:

  1. 它不会等到handleUpdatedValuesComingFromSensor的一次迭代在开始另一次之前完成(假设handleUpdatedValuesComingFromSensor真的做了异步的事情)所以你可以在飞行中同时有多个handleUpdatedValuesComingFromSensor实例。
  2. 同样,如果您有多个订阅者,那么在将事件发送给下一个订阅者之前,它不会等待handleUpdatedValuesComingFromSensor完成。
© www.soinside.com 2019 - 2024. All rights reserved.