如何使用 rxjs 有条件地重复承诺

问题描述 投票:0回答:6

我想重复一个返回 Promise 的 API 调用,有条件地使用 rxjs。

API 方法接收一个 id,该 id 将在每次调用时通过添加计数器前缀来更改。调用将被重复,直到数据满足某些条件或计数器达到特定数字 X。如何使用 rxjs 完成?

API方法:

fetchData(id):Promise<data>

尝试1:fetchData(id)

尝试2:fetchData(id_1)

尝试3:fetchData(id_2)

javascript angular promise rxjs rxjs-observables
6个回答
3
投票

IMO,最好通过 Promises 或 RxJS 处理轮询,而不是混合使用它们。我会使用 RxJS 进行说明。

尝试以下方法

  1. 使用 RxJS
    from
    函数将 Promise 转换为可观察对象。
  2. 使用
    timer
    interval
    等 RxJS 函数以固定间隔定期发出值。
  3. 使用
    switchMap
    等高阶映射运算符从外部发射映射到 API 调用。有关不同类型的高阶映射运算符的简要说明,请参阅此处
  4. 使用两个
    takeWhile
    运算符(分别对应您的每种情况)来完成订阅。
  5. 使用
    filter
    运算符仅转发通过条件的排放。
import { from } from 'rxjs';

fetchData(id: any): Observable<any> {  // <-- return an observable
  return from(apiCall);                // <-- use `from` to convert Promise to Observable
}
import { timer } from 'rxjs';
import { filter, switchMap, takeWhile } from 'rxjs/operators';

timer(0, 5000).pipe(                        // <-- poll every 5 seconds
  takeWhile((index: number) => index < 20)  // <-- stop polling after 20 attempts
  switchMap((index: number) => 
    this.someService.apiCall(index+1)       // <-- first emission from `timer` is 0
  ),
  takeWhile(                                // <-- stop polling when a condition from the response is unmet
    (response: any) => response.someValue !== someOtherValue,
    true                                    // <-- emit the response that failed the test
  ),
  filter((response: any) => 
    response.someValue === someOtherValue   // <-- forward only emissions that pass the condition
  )
).subscribe({
  next: (response: any) => {
    // handle response
  },
  error: (error: any) => {
    // handle error
  }
});

编辑:第二个

takeWhile
中的条件与要求相反。我调整了条件并包含了
inclusive=true
参数。感谢评论中的@Siddhant。


1
投票

您可以使用 concatMap 来确保一次只尝试一个调用。

range
给出最大调用次数,因为如果满足/不满足条件,
takeWhile
将提前取消订阅(在范围完成之前)。

可能看起来像这样:

// the data met some condition
function metCondition(data){
  if(data/*something*/){
    return true;
  } else {
    return false
  }
}

// the counter reach to a specific number X
const x = 30;

range(0, x).pipe(
  concatMap(v => fetchData(`id_${v === 0 ? '' : v}`)),
  takeWhile(v => !metCondition(v))
).subscribe(datum => {
  /* Do something with your data? */
});

0
投票

您可以尝试重试时间:

let counter=0;

const example = of(1).pipe(
  switchMap(x => of(counter)), // Replace of() with from(fetchData('id_'+counter))
  map(val => {
    if (val < 5) {
      counter++;
      // error will be picked up by retryWhen
      throw val;
    }
    return val;
  }),
  retryWhen(errors =>
    errors.pipe(
      // log error message
      tap(val => console.log(`Response was missing something`)),
    )
  )
);

这并不理想,因为它需要在外部范围内有一个计数器,但在有更好的解决方案(特别是没有基于时间的重试)之前,这应该可行。


0
投票

我知道您已经指定使用 rxjs,但是您还指定

fetchData()
返回
promise
而不是
observable
。在这种情况下,我建议使用
async
await
而不是 rxjs。

  async retryFetch() {
    let counter = 0;
    while (counter++ < 20 && !this.data) {
      this.data = await this.fetchData(counter);
    }
  }

您可以在条件中输入任何内容。

即使你的 api 调用返回了一个 observable,我仍然建议将其包装在一个 Promise 中并使用这个非常可读的解决方案。

下面的 stackblitz 用 Promise 包装了标准

http.get
并实现了上述功能。 Promise 将随机返回数据或未定义。

https://stackblitz.com/edit/angular-ivy-rflclt?file=src/app/app.component.ts


0
投票

我改编了Mrk Sef的答案,我将其与在其他地方承诺基于分页的提取一起使用。

 import {
  takeWhile,
  from,
  map
} from "rxjs";

const fetchData = async (v) => {
  console.log("fetched " + v);
  return Promise.resolve(v);
};

const looper = {
  *[Symbol.iterator]() {
    while (true) {
      yield 1;
    }
  }
};

function keepLooping(data) {
  return data <= 5;
}

let count = 0;

from(looper)
  .pipe(
    map((v) => count++),
    takeWhile((v) => keepLooping(count))
  )
  .subscribe({
    next: async (v) => {
      await fetchData(count);
      console.log(v, count);
    }
  });

-3
投票
let count = 0
const timerId = setTimout( () =>{
   if(count){
      fetchData(`id_${count}`)
   }else{
      fetchData('id')
   }
   count = count + 1
,60000}) //runs every 60000 milliseconds

const stopTimer = () =>{ //call this to stop timer
    clearTimeout(timerId);
}
© www.soinside.com 2019 - 2024. All rights reserved.