我想重复一个返回 Promise 的 API 调用,有条件地使用 rxjs。
API 方法接收一个 id,该 id 将在每次调用时通过添加计数器前缀来更改。调用将被重复,直到数据满足某些条件或计数器达到特定数字 X。如何使用 rxjs 完成?
API方法:
fetchData(id):Promise<data>
尝试1:fetchData(id)
尝试2:fetchData(id_1)
尝试3:fetchData(id_2)
IMO,最好通过 Promises 或 RxJS 处理轮询,而不是混合使用它们。我会使用 RxJS 进行说明。
尝试以下方法
from
函数将 Promise 转换为可观察对象。timer
或 interval
等 RxJS 函数以固定间隔定期发出值。switchMap
等高阶映射运算符从外部发射映射到 API 调用。有关不同类型的高阶映射运算符的简要说明,请参阅此处。takeWhile
运算符(分别对应您的每种情况)来完成订阅。filter
运算符仅转发通过条件的排放。import { from } from 'rxjs';
fetchData(id: any): Observable<any> { // <-- return an observable
return from(apiCall); // <-- use `from` to convert Promise to Observable
}
import { timer } from 'rxjs';
import { filter, switchMap, takeWhile } from 'rxjs/operators';
timer(0, 5000).pipe( // <-- poll every 5 seconds
takeWhile((index: number) => index < 20) // <-- stop polling after 20 attempts
switchMap((index: number) =>
this.someService.apiCall(index+1) // <-- first emission from `timer` is 0
),
takeWhile( // <-- stop polling when a condition from the response is unmet
(response: any) => response.someValue !== someOtherValue,
true // <-- emit the response that failed the test
),
filter((response: any) =>
response.someValue === someOtherValue // <-- forward only emissions that pass the condition
)
).subscribe({
next: (response: any) => {
// handle response
},
error: (error: any) => {
// handle error
}
});
编辑:第二个
takeWhile
中的条件与要求相反。我调整了条件并包含了 inclusive=true
参数。感谢评论中的@Siddhant。
您可以使用 concatMap 来确保一次只尝试一个调用。
range
给出最大调用次数,因为如果满足/不满足条件,takeWhile
将提前取消订阅(在范围完成之前)。
可能看起来像这样:
// the data met some condition
function metCondition(data){
if(data/*something*/){
return true;
} else {
return false
}
}
// the counter reach to a specific number X
const x = 30;
range(0, x).pipe(
concatMap(v => fetchData(`id_${v === 0 ? '' : v}`)),
takeWhile(v => !metCondition(v))
).subscribe(datum => {
/* Do something with your data? */
});
您可以尝试重试时间:
let counter=0;
const example = of(1).pipe(
switchMap(x => of(counter)), // Replace of() with from(fetchData('id_'+counter))
map(val => {
if (val < 5) {
counter++;
// error will be picked up by retryWhen
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
// log error message
tap(val => console.log(`Response was missing something`)),
)
)
);
这并不理想,因为它需要在外部范围内有一个计数器,但在有更好的解决方案(特别是没有基于时间的重试)之前,这应该可行。
我知道您已经指定使用 rxjs,但是您还指定
fetchData()
返回 promise
而不是 observable
。在这种情况下,我建议使用 async
和 await
而不是 rxjs。
async retryFetch() {
let counter = 0;
while (counter++ < 20 && !this.data) {
this.data = await this.fetchData(counter);
}
}
您可以在条件中输入任何内容。
即使你的 api 调用返回了一个 observable,我仍然建议将其包装在一个 Promise 中并使用这个非常可读的解决方案。
下面的 stackblitz 用 Promise 包装了标准
http.get
并实现了上述功能。 Promise 将随机返回数据或未定义。
https://stackblitz.com/edit/angular-ivy-rflclt?file=src/app/app.component.ts
我改编了Mrk Sef的答案,我将其与在其他地方承诺基于分页的提取一起使用。
import {
takeWhile,
from,
map
} from "rxjs";
const fetchData = async (v) => {
console.log("fetched " + v);
return Promise.resolve(v);
};
const looper = {
*[Symbol.iterator]() {
while (true) {
yield 1;
}
}
};
function keepLooping(data) {
return data <= 5;
}
let count = 0;
from(looper)
.pipe(
map((v) => count++),
takeWhile((v) => keepLooping(count))
)
.subscribe({
next: async (v) => {
await fetchData(count);
console.log(v, count);
}
});
let count = 0
const timerId = setTimout( () =>{
if(count){
fetchData(`id_${count}`)
}else{
fetchData('id')
}
count = count + 1
,60000}) //runs every 60000 milliseconds
const stopTimer = () =>{ //call this to stop timer
clearTimeout(timerId);
}