如何将函数应用于 rx-java Flowable 的每个元素?

问题描述 投票:0回答:1

如何将函数应用于 Flowable 中的每个元素?我试图将一个函数应用于可流动发布者中的每个项目并收集结果,但该函数永远不会被调用。

CompletionStage<Void> batchUpdate(Page<AClass> page); //Given

Completion<Void> updateItems(){
    final var futures = new ArrayList<CompletionStage<Void>>();
    final Flowable<Page<AClass>> flowable = getFlowablePublisher();
    flowable.map(this::batchUpdate).map(futures::add); //batchUpdate never gets called
    return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
java rx-java
1个回答
0
投票

在您当前的实现中,batchUpdate 函数没有被调用,因为您需要触发 Flowable 的处理。 Flowable 是一种反应式流,遵循惰性评估方法。在您订阅之前,它不会开始处理数据。

要将batchUpdate函数应用到Flowable中的每个项目并收集结果,您需要订阅Flowable。您可以通过以下方式修改代码来实现此目的:

import io.reactivex.Flowable;

// ...

Completion<Void> updateItems() {
    final var futures = new ArrayList<CompletionStage<Void>>();
    final Flowable<Page<AClass>> flowable = getFlowablePublisher();

    flowable.flatMap(page -> {
        // Apply the batchUpdate function to each item and collect the results.
        CompletionStage<Void> future = batchUpdate(page);
        futures.add(future);
        return future;
    }).subscribe();

    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

在此更新的代码中,我们使用 flatMap 运算符将batchUpdate 函数应用于 Flowable 中的每个项目,并将结果收集到 futures 列表中。 flatMap 运算符返回一个新的 Flowable,该 Flowable 发出由 batchUpdate 函数返回的 CompletionStage 的结果。

但是,您可能会注意到 Completion 的返回类型不是有效的 Java 语法。应该是完成阶段。因此,请确保在实际代码中相应地更新返回类型。

此外,请确保 getFlowablePublisher() 方法正确地向 Flowable 提供要处理的数据。

请注意,在反应式编程中,您通常使用异步操作,因此使用 CompletionStage 或类似类型很常见。请务必在实际实施中适当处理异常和错误场景。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.