flatMap如何深入工作?

问题描述 投票:3回答:1

我感兴趣的是flatMap如何控制其“子”线程,例如以下代码可以正常工作:

 private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
    return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
            .onBackpressureLatest()
            .parallel()
            .runOn(Schedulers.computation())
            .flatMap((Function<Long, Publisher<PlcDataPackage>>) aLong -> mDataPackageFlowable)
            .sequential();
}

并且此代码在被调用128次后停止(这是默认的maxConcurent for flowable):

  private ConnectableFlowable<PlcDataPackage> createConnectablePlcFlowable() {
    return mPlcIntervalFlowable.onBackpressureLatest()
            .subscribeOn(Schedulers.single())
            .publish();
}

订阅:

addDisposable(mGetPlcUpdatesChanelUseCase.execute()
                              .observeOn(AndroidSchedulers.mainThread())
                              .subscribe(plcDto -> Timber.d("plcReceiver"),
                                         Timber::e));

用例:

public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {

    private final PlcRepository mPlcRepository;

    public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {
        mPlcRepository = plcRepository;
    }

    @Override
    public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {
        return mPlcRepository.getUpdatesChannel();
    }

    @Override
    public boolean isParamsRequired() {
        return false;
    }
}

回购方法

@Override
    public Flowable<PlcDto> getUpdatesChannel() {
        return mPlcCore.getPlcConnectableFlowable()
                .map(mPlcInfoTopPlcDtoTransformer::transform);
    }

PlcCore方法

public ConnectableFlowable<PlcDataPackage> getPlcConnectableFlowable() {
    return mConnectableFlowable;
}

而mConnectableFlowable是:

mConnectableFlowable = createConnectablePlcFlowable();
        mConnectableFlowable.connect();

据我所知,mDataPackageFlowable被创建一次,然后执行它,每次为它的子进程创建新的“线程”,并且在128执行之后它只会阻止所有后续的执行。

所以有三个主要问题:

1)flatMap控制子线程吗?

2)为什么它在新线程上执行每个新的“请求”?(也许不,请告诉我)

3)在什么情况下我们可以失去对子线程的控制。

免责声明:英语是我的第二语言,如果有什么不明确的问我,我会尝试补充说明。


 private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
    return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
            .onBackpressureLatest()
            .parallel()
            .runOn(Schedulers.computation())
            .sequental()

这种组合不起作用,它实际上删除了128次flatMap调用限制,但不清除导致内存泄漏和OOM异常的旧的innersubscription。请改用某种地图。

android rx-java rx-java2 rx-android
1个回答
3
投票

需要订阅观察链才能正常工作。当您使用interval()生成数据时,您将提供一个“热”可观察量,它可以自己发出值。 “冷”可观察量仅在订阅发生时才会发出值。

128是flatMap()在停止之前缓冲的条目数。如果有订阅,那么flatMap()将发出内部可观察产生的下游值,并且它不会停止。

根据javadoc,flatMap()本身不能在特定的调度程序上运行。这意味着它不会在特定线程上操纵它的订阅。如果要控制flatMap()调用的observable中正在完成的工作,则使用显式调度:

observable
  .flatMap( value -> fun(value).subscribeOn( myScheduler ) )
  .subscribe();

例如,myScheduler可能是一个Schedulers.io(),它在需要时创建线程。或者,它可能是您提供固定数量的线程的Executor。我经常使用Executors,它只分配了一个或两个或48个线程来控制flatMap()的扇出。

您还可以向flatMap()提供并行性参数,该参数告诉它将维护的最大订阅数。当flatMap()达到最大值时,它将缓冲请求,直到它订阅的观察链完成为止。

parallel()运算符执行类似的操作,但它会分离传入的事件,并在不同的线程上发出它们。再次,javadoc具有出色的描述,以及良好的图片。

始终可能失去对线程的控制。使用RxJava运算符时,请阅读它的文档。您需要了解两个方面。第一个区域是运营商处理的调度程序。如果它说它不在特定的调度程序上运行,那么它不会直接影响线程的选择或线程的使用方式。如果它声明它使用特定的调度程序,那么您需要了解该调度程序的工作原理;将始终有另一个版本的运算符,允许您提供自己选择的调度程序。

你必须了解的第二个方面是背压。您需要了解背压的含义以及应用方式。每当您跨越线程边界时,这一点尤其重要,例如使用observeOn()subscribeOn()

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.