效果中可取消的NGRX服务轮询

问题描述 投票:1回答:3

如何使用停止轮询的效果轮询服务,直到返回的服务值满足条件或总持续时间超过超时阈值?

例如:后端系统正在生成资源,前端应用程序可以通过调用返回布尔值的REST api调用来检查该资源是否可用。在我的NGRX应用程序中,我想每隔200毫秒轮询一次这个api调用,直到这个api调用返回布尔值为真或者总轮询持续时间超过10000毫秒的阈值。

以下代码示例显示了轮询机制,但是,此轮询无法取消也不会超时。这是怎么做到的?

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/switchMap';

@Effect()
pollEffect$: Observable<Action> = this.actions$
  .ofType(tasksActions.ActionTypes.START_POLLING)
  .switchMap(() => Observable.timer(0, 200)
    .switchMap(() => this.myBackendService.getAvailability().map(response => 
       return taskActions.ActionTypes.UpdateValue(response))
    )
  );
angular ngrx ngrx-store ngrx-effects
3个回答
2
投票

你可以尝试这样的事情:

@Effect()
pollEffect$: Observable<Action> = this.actions$
  .ofType(tasksActions.ActionTypes.START_POLLING)
  .switchMap(() => Observable.timer(0, 200)
    // stop the polling after a 10s timeout
    .takeUntil(Observable.of(true).delay(10000))
    .switchMap(() => this.myBackendService.getAvailability()
      .takeWhile(response => /* do your condition here */)
      .map(response => taskActions.ActionTypes.UpdateValue(response))
    )
  );

这样,轮询将停止: - 超时10秒后 要么 - 如果基于反应的条件为真

编辑1:

根据你的评论,我确实认为你应该这样做:

@Effect()
pollEffect$: Observable<Action> = this.actions$
  .ofType(tasksActions.ActionTypes.START_POLLING)
  .switchMap(() => Observable.timer(0, 200)
    // stop the polling after a 10s timeout
    .takeUntil(Observable.of(true).delay(10000))
    .switchMap(() => this.myBackendService.getAvailability())
    .takeWhile(response => /* do your condition here */)
    .map(response => taskActions.ActionTypes.UpdateValue(response))
  );

0
投票

您可以使用过滤器运算符根据需要过滤数据。当条件为真时,filter运算符将发出数据。

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/filter';

@Effect()
pollEffect$: Observable<Action> = this.actions$
  .ofType(tasksActions.ActionTypes.START_POLLING)
  .switchMap(() => Observable.timer(0, 200)
    .switchMap(() => this.myBackendService.getAvailability().filter(response => //condition).map(response => 
       return taskActions.ActionTypes.UpdateValue(response))
    )
  );

0
投票

有完全相同的问题,我无法从互联网上找到答案。我花了一些时间才能让它发挥作用。这就是我做的。我注意到我把'takeUntil(this.pollingUntil $)'放在哪里很重要,因为我需要用最新数据更新商店,所以我把它放在首位。如果我把它放在底部(注释行),则轮询被取消,我无法用最新数据更新商店。

private readonly pollingIntervalMs = 5000;
private readonly maxPollingMs = 600000; // 10 sec

private pollingUntil$: Subject<boolean> = new Subject<boolean>();

@Effect()
pollDb$: Observable<Action> = this.actions$.pipe(
  ofType(infraActions.InfraActionTypes.PollDb),
  switchMap(pollAction => interval(this.pollingIntervalMs).pipe(
    takeUntil(timer(this.maxPollingMs)),
    takeUntil(this.pollingUntil$),
    mapTo(pollAction),
    switchMap(
      (action: infraActions.PollDb) => this.dbService.get().pipe(
        map((dbInfo: DbInfo) => {
          if (meet condition) {
            this.pollingUntil$.next(true);
          }
          return dbInfo;
        })
      )
    ),
//    takeUntil(this.pollingUntil$),
    map((dbInfo: DbInfo) => {
      return new infraActions.PollDbSuccess(dbInfo);
    }),
    catchError(err => of(new infraActions.LoadDbFail(err)))
  )),
);
© www.soinside.com 2019 - 2024. All rights reserved.