如何在基于Executor的库中使用Project Reactor的Scheduler?

问题描述 投票:0回答:1

Project Reactor通过定义Scheduler提供了一种很好的方式来定义要在哪个线程池上运行代码。它还为通过CompletableFuture使用Mono.fromFuture(..)的库提供了桥梁。

AWS的async client for DyanmoDB,执行从C [0]上的API调用返回的CompletableFuture。默认情况下,它创建一个java.util.concurrent.Executor,该线程由它同时创建的线程池支持。结果是,即使具有定义的Executor的流(例如Scheduler)在库创建的池中的线程上执行,而不是在Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())中创建的线程上执行。因此,我们看到的是诸如Schedulers.boundedElastic()之类的线程名称,而不是诸如sdk-async-response-0-2之类的名称。

幸运的是,该库允许我们以boundedElastic-1的形式提供自己的Executor,所以我的问题是:

您如何在运行时使用shown here 在流的那部分上定义的]中的线程来构建Executor

用例

我们有一个存储库类,它具有Scheduler方法,我们需要调用方能够控制要在哪个findById上运行,因为它在以下截然不同的上下文中使用:

  1. Scheduler调度程序上运行的API响应。
  2. 处理已定义的调度程序中按顺序在每个分区的线程上按顺序执行的Kafka消息,如Schedulers.boundedElastic()中所示。
  3. 尝试

我们已经尝试使用Reactor Kafka docsExecutor来定义Schedulers.immediate(),如下所示,但是两者都导致在Netty事件循环线程(示例名称:Runnable::run)上执行,而不是在已定义的线程上执行aws-java-sdk-NettyEventLoop-0-2

Scheduler
DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        runnable -> Schedulers.immediate().schedule(runnable)
    ))
    .build();

Project Reactor通过定义调度程序,提供了一种很好的方式来定义要在哪个线程池上运行代码。它还为使用CompletableFuture的库提供了桥梁,尽管Mono.fromFuture(...

java apache-kafka amazon-dynamodb project-reactor aws-sdk-java-2.0
1个回答
0
投票

观察与订阅

© www.soinside.com 2019 - 2024. All rights reserved.