Flowable rxjava3 中的无界背压

问题描述 投票:0回答:1

Reactive Streams 是围绕背压构建的,这很棒,我想更好地了解如何使用 RxJava3 API。删除 GUI 事件溢出数据的方法非常好,但如果我处理文件/Kafka 主题,那么通常它会受到将数据插入数据库的速度的限制,并且希望确保我使用正确的 RxJava 模式来不会丢失数据并减慢生产者的速度。

我在 RxJava 小组发布了一个问题,并被要求将其发布在这里。

Flowable#subscribe 没有指定订阅请求大小的版本,但都使用 FlowableInternalHelper.RequestMax.INSTANCE,这是什么原因。

默认订阅大小与默认缓冲区大小(128 - 因此订阅默认为 64)不相关的背景是什么?

我是否正确地阅读了 API,为了应用背压而不丢失数据/因错误而失败,我需要使用

subscribeWith(new DisposableSubscriber..)
,如下所示,并且 API 中没有快捷方式?

.subscribeWith(
    new DisposableSubscriber<List<Buffer>>() {
      @Override
      public void onStart() {
        request(1);
      }

      @Override
      public void onNext(List<Buffer> buffers) {
        // ...
        request(1);
      }

      @Override
      public void onError(Throwable throwable) {
        // ...
      }

      @Override
      public void onComplete() {
        // ...
      }
    });

// or similar example with vertx promise
.subscribeWith(new RequestSub<>(promise::fail, promise::complete));

@RequiredArgsConstructor
public class RequestSub<T> extends DisposableSubscriber<T> {

  private final Consumer<? super Throwable> onError;
  private final Runnable onComplete;

  @Override
  public void onStart() {
    request(1);
  }

  @Override
  public void onNext(T next) {
    request(1);
  }

  @Override
  public void onError(Throwable throwable) {
    onError.accept(throwable);
  }

  @Override
  public void onComplete() {
    onComplete.run();
  }
}

我写了一个小rxjava-agent,让我在使用无限订阅时捕获更容易的情况 - API 已记录,但我更容易看到实际发生的情况。

rx-java backpressure rx-java3
1个回答
0
投票

没有捷径,因为没有好的请求模式可以默认。因此,我们不想进一步扩大 API 界面。

如果您需要自定义,请使用您已经通过实现

Subscriber
或扩展内置方法之一发现的方法。

此外,除非您在订阅者中执行异步工作,否则请求模式相对于无界请求模式没有任何好处,因为它总是同步消耗其上游。也就是说,如果您预先请求 1 个,然后每次在 onNext 中请求 1 个,上游将看到它可以立即继续下一个项目(如果可用)。

© www.soinside.com 2019 - 2024. All rights reserved.