如何使用'merge'来限制可观察列表的并发性,但只有在所有可观察对象完成后才返回?

问题描述 投票:1回答:2

问题:

我有一个网址列表。我有一个Observable方法,它使用url来获取文件,下载并在本地存储它。我想并行启动这些请求,但一次只允许4个线程(我生成pdfs服务器端并希望减少负载)。此外,只有在下载了所有URL位置后,我才需要从此下载步骤返回。

当前解决方案

现在,我只是一次性启动请求并使用forkJoin。在搜索了几天后,我在这里遇到了一些解决方案,它们给了我一些想法,但它们并没有完全符合我的要求。我的主要来源是here

export function limitedParallelObservableExecution<T>(listOfItems: Array<T>, observableMethod: (item: T) => Observable<any>): Observable<any> {
  const MAX_CONCURRENCY = 4;
  if (listOfItems && listOfItems.length > 0) {
    let observableListOfItems: Observable<Observable<any>> = Observable.from(listOfItems).map(
      (item: T) => observableMethod(item)
    );
    return observableListOfItems.merge(MAX_CONCURRENCY);
  } else {
    return Observable.of({});
  }
}

我有另一个下载步骤是flatMapped,一旦完成此步骤就执行。但是,不是只执行一次,下一步就会对列表中的每个url执行一次(据我所知,这是因为它为每个完成的url发出一次)。

如何在我的所有下载完成后只返回一次时保持这种并发性?

此外,这似乎仍然会立即启动我的所有请求。有没有更好的方法来限制同时请求的数量?比如,并行启动n个请求,但只有在前n个完成后才启动n + 1个?

额外的代码示例

这是一段代码片段,展示了我只有在前一次下载步骤完成后才启动每个下载步骤:

).flatMap(
  (uploadFlightActualsSuccess) => {
    this.changeProgressValue(this.FLIGHT_ACTUALS_UPLOAD_END); 
    return this.syncDocuments();
  }
).flatMap(
  (syncDocumentsSuccess) => {
    this.changeProgressValue(this.OPERATOR_DOCUMENT_DOWNLOAD_END);
    return this.syncTripDocuments()
  },
  (error) => error
).flatMap(
  (syncTripDocumentsSuccess) => {
    this.changeProgressValue(this.TRIP_DOCUMENT_DOWNLOAD_END);      
    return this.expenseItemSyncProvider.syncPortalData();
  }
).flatMap(
  (expenseItemSyncSuccess) => {
    return this.flightPersonnelSyncProvider.syncFlightPersonnelByTrip();
  }
).flatMap(

'syncTripDocuments'是下载网址列表的请求。一旦完成,我只想进行下一步。

angular rxjs observable reactive-programming
2个回答
1
投票

这是一种方法,使用zip运算符来限制请求,就像这样。

从两个流开始,第一个是要下载的URL序列,第二个是4个对象的序列,所以像这样:

s1$ = Observable.from(list_of_urls);
s2$ = new ReplaySubject();
for(let i = 0; i < 4 ; i++) s2$.next(i);

然后将这两个压缩在一起,然后mergeMap下载文件。每次下载完成后,在s2$发出一个新事件,以便它可以继续,如下所示:

s3$ = s1$.pipe(
    zip(s2$),
    mergeMap(([a, b]) => download_url(a).pipe(tap(c => s2$.next(c)))))

所以现在,每次文件完成下载时,s2$都会发出一个新元素,允许处理下一个压缩对。

编辑

或者我们可以使用一个简单的Subject而不是ReplaySubject并移动for循环发出前四个值,直到订阅s3$之后:

s1$ = Observable.from(list_of_urls);
s2$ = new Subject();
s3$ = s1$.pipe(
    zip(s2$),
    mergeMap(([a, b]) => download_url(a).pipe(tap(c => s2$.next(c)))))
s3$.subscribe(...);
for(let i = 0; i < 4 ; i++) s2$.next(i);

编辑2

而不是使用for循环创建前4个元素,我们可以使用from([1,2,3,4]).pipe(concat(s2$))而不是简单地用s2$压缩

我没有运行任何这个,但你得到了一般的想法。


0
投票

发布的解决方案的问题(虽然给我并发灵活性)是他们不满足要求整个操作只发出一次,每个项目完成后的条件。

工作方案如下:

export function limitedParallelObservableExecution<T>(listOfItems: Array<T>, observableMethod: (item: T) => Observable<any>, maxConcurrency: number = 4): Observable<any> {
  if (listOfItems && listOfItems.length > 0) {
    let observableListOfItems: Observable<T> = Observable.from(listOfItems);
    return observableListOfItems.mergeMap(observableMethod, maxConcurrency).toArray();
  } else {
    return Observable.of({});
  }
}

这里的策略是:

1)从项列表中创建一个可观察的流

2)将可观察的方法与maxConcurrency一起传递给mergeMap

3)使用toArray()确保在返回之前完成所有可观察对象

© www.soinside.com 2019 - 2024. All rights reserved.