Reactive Streams 是围绕背压构建的,这很棒,我想更好地了解如何使用 RxJava3 API。删除 GUI 事件溢出数据的方法非常好,但如果我处理文件/Kafka 主题,那么通常它会受到将数据插入数据库的速度的限制,并且希望确保我使用正确的 RxJava 模式来不会丢失数据并减慢生产者的速度。
我在 RxJava 小组发布了一个问题,并被要求将其发布在这里。
Flowable#subscribe 没有指定订阅请求大小的版本,但都使用 FlowableInternalHelper.RequestMax.INSTANCE,这是什么原因。
默认订阅大小与默认缓冲区大小(128 - 因此订阅默认为 64)不相关的背景是什么?
我是否正确地阅读了 API,为了应用背压而不丢失数据/因错误而失败,我需要使用
subscribeWith(new DisposableSubscriber..)
,如下所示,并且 API 中没有快捷方式?
.subscribeWith(
new DisposableSubscriber<List<Buffer>>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onNext(List<Buffer> buffers) {
// ...
request(1);
}
@Override
public void onError(Throwable throwable) {
// ...
}
@Override
public void onComplete() {
// ...
}
});
// or similar example with vertx promise
.subscribeWith(new RequestSub<>(promise::fail, promise::complete));
@RequiredArgsConstructor
public class RequestSub<T> extends DisposableSubscriber<T> {
private final Consumer<? super Throwable> onError;
private final Runnable onComplete;
@Override
public void onStart() {
request(1);
}
@Override
public void onNext(T next) {
request(1);
}
@Override
public void onError(Throwable throwable) {
onError.accept(throwable);
}
@Override
public void onComplete() {
onComplete.run();
}
}
我写了一个小rxjava-agent,让我在使用无限订阅时捕获更容易的情况 - API 已记录,但我更容易看到实际发生的情况。
没有捷径,因为没有好的请求模式可以默认。因此,我们不想进一步扩大 API 界面。
如果您需要自定义,请使用您已经通过实现
Subscriber
或扩展内置方法之一发现的方法。
此外,除非您在订阅者中执行异步工作,否则请求模式相对于无界请求模式没有任何好处,因为它总是同步消耗其上游。也就是说,如果您预先请求 1 个,然后每次在 onNext 中请求 1 个,上游将看到它可以立即继续下一个项目(如果可用)。