反应式编程是一种围绕数据流和变化传播的编程范例。
我正在寻找一个数据结构(由泛型类表示)的现有实现(最好是 C#),该实现存储一个基值,一个可以按顺序存储的委托列表(根据
可以在Webflux中做到这一点吗?并行调用 2 个服务,如果第一个服务满足条件,则不要等待第二个服务
我想调用2个服务“a”和“b”,并根据a的响应,等待或不等待b。 像这样的东西: 公共 Mono 测试(){ 单声道 我想调用 2 个服务“a”和“b”,并根据 a 的响应,等待或不等待 b。 类似这样的: public Mono<String> test() { Mono<String> monoA = Mono.fromCallable(() -> { try { log.info("A Started"); Thread.sleep(2000); log.info("A Ended"); return Math.random() > 0.5 ? "A" : "a"; } catch (InterruptedException e) { throw new RuntimeException(e); } }).subscribeOn(Schedulers.boundedElastic()); Mono<String> monoB = Mono.fromCallable(() -> { try { log.info("B Started"); Thread.sleep(8000); log.info("B Ended"); return "B"; } catch (InterruptedException e) { throw new RuntimeException(e); } }).subscribeOn(Schedulers.boundedElastic()); long startTime = System.currentTimeMillis(); return monoA.flatMap(a -> { Mono<String> result; if(a.equals("A")) { result = Mono.just(a); } else { result = monoB; } return result; }).map(response -> { long totalTime = System.currentTimeMillis() - startTime; return "Returning '" + response + "' in: " + totalTime + " ms"; }); } 如果 monoA = "A" 打印: A Started A Ended Returning 'A' in: 2032 ms 如果 monoA = "a" 打印: A Started A Ended B Started B Ended Returning 'B' in: 10007 ms 如果我使用: Mono.zip(monoA, monoB) 最好和最坏的情况总是 8000ms 我想要 if monoA = "A" response in 2000ms if monoA = "a" response in 8000ms 我想要做的事情可能甚至是正确的吗? 我相信要达到你想要的目的,你需要无条件地开始执行monoB。 使用 @Test 将完整代码封装在 StepVerifier 带注释的方法中。 import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import static graphql.Assert.assertTrue; import static org.assertj.core.api.Fail.fail; public class MonoTest { public static final Logger log = LoggerFactory.getLogger(MonoTest.class); @Test void test() { Mono<String> monoA = Mono.fromCallable(() -> { try { log.info("A Started"); Thread.sleep(2000); log.info("A Ended"); return Math.random() > 0.5 ? "A" : "a"; } catch (InterruptedException e) { throw new RuntimeException(e); } }).subscribeOn(Schedulers.boundedElastic()); Mono<String> monoB = Mono.fromCallable(() -> { try { log.info("B Started"); Thread.sleep(8000); log.info("B Ended"); return "B"; } catch (InterruptedException e) { throw new RuntimeException(e); } }).subscribeOn(Schedulers.boundedElastic()).cache(); monoB.subscribe(); // start execution of monoB long startTime = System.currentTimeMillis(); Mono<String> result = monoA.flatMap(monoResult -> "A".equals(monoResult) ? Mono.just(monoResult) : monoB); StepVerifier.create(result).assertNext(response -> { long elapsedTime = System.currentTimeMillis() - startTime; if ("A".equals(response)) { assertTrue(elapsedTime <= 2100, "Execution time exceeded 2100 ms for result 'A'"); } else if ("B".equals(response)) { assertTrue(elapsedTime <= 8100, "Execution time exceeded 8100 ms for result 'B'"); } else { fail("Unexpected response: " + response); } }) .verifyComplete(); } }
从 REST API(JSON 响应)收集一系列分页结果(776 页)并将最终结果插入 DB SQL 或创建 csv 文件
我正在使用 Rest API,它从 SQL Server 数据库公开 6212514 行。 REST API 的响应是 JSON,表示数据库行。 要调用此 API REST 并获取孔数据,...
我在测试中经常使用 Awaitility 来处理异步操作。但是,每个 Awaitility 调用都会生成线程,这会导致运行大型测试时内存消耗较高。我正在寻找...
我了解信号的基础及其用途,并且自己采用了它们。只是想完全了解何时使用它们。 我对何时使用信号感到困惑,我已经看到了......
在 Svelte 中,我们有一个非常漂亮的方法来及时加载一些数据并使用 @const 渲染它: 让a=假; a = !a}> 在 Svelte 中,我们有一种非常漂亮的方式来及时加载一些数据并使用 @const 渲染它: <script> let a = false; </script> <button on:click={() => a = !a}></button> {#if a} {@const table = axios.get(...)} {#await table} <Spinner /> {:then table} ... {/await} {/if} 这样,我们就不必在主体中声明任何内容。但有时我们可能需要有一些逻辑在里面: <script> let a = false; </script> <button on:click={() => a = !a}></button> {#if a} {@const table = axios.get(...)} {#await table} <Spinner /> {:then table} {#each table as x} <button on:click={() => x.open = true}> Open row! </button> {#if x.open === true} ... {/if} {/each} {/await} {/if} 它不会工作,因为表是一个@const。我想找到一种聪明而简单的方法来解决这个问题。我可以在这里使用任何好的做法来代替 @const 来不使用状态变量重载组件主体吗? 这里的关键是能够仅在需要时(呈现时)发送请求,而我的问题是它不能拥有自己的状态。我试图想象一些奇怪的或无状态的解决方案(而不是“x.open”),但没有想出任何有效的方法。事情是这样的: <script> let a = false; </script> <button on:click={() => a = !a}></button> {#if a} {@const table = axios.get(...)} {#await table} <Spinner /> {:then table} {#each table as x} {@const open = new class { state = false; toggle = function () { return this.state = !this.state}}} <button on:click={() => open.toggle()}> Open row! </button> {#if open.state} ... {/if} {/each} {/await} {/if} 这当然是行不通的,即使行得通——它离任何神圣的东西都太远了。 尽管如此。我有点明白了。如果任何实体应该有状态(如开放状态),那么它应该有自己的组件。它对于简单结构来说并不理想,但这是我想出的唯一合乎逻辑的解决方案。
在 Exchange() 之后调用 bodyToMono 时,block()/blockFirst()/blockLast() 会出现阻塞错误
我正在尝试使用 Webflux 将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,API 将返回成功,但会使用 DTO 详细说明错误,同时
我是 System.Reactive 的新手,对主题及其使用方法有些困惑。 我在这里有一个主题的基本实现: 公共类 MessageService :IMessageService { 私人
避免 Flux<>Input 的函数方法中嵌套 Flux<Mono<>>
> 的情况,如下所述 方法如下...
如何在 Micronaut Reactive Controllers 中设置 cookie
我有这些返回 Monos 的服务方法,我正在尝试用它们设置 cookie。这是行不通的。有谁知道该怎么做? 我需要它来返回 HttpResponse 和 ...
我在 mac os (Apple M3 Pro) 上部署了 docker。我正在从本地主机运行 Spring Boot 应用程序。但是,无法连接到副本集。 2024-11-19T21:46:50.470+03:00 ...
反应流(monix)运算符组合,用于缓冲具有重叠元素的时间跨度
我有一个连续发射的 Observable[T] ,我想要一个 Observable[List[T]] ,它为源发射的每个元素在指定持续时间内发射最后一个元素。例子 可观察到的。范围...
当使用其输入来自的反应值时,通过 DT 包的 DataTables 界面显示的表格显得杂乱(例如无序的元素、奇怪的分页......)。
我正在尝试创建一个表(使用DT,请不要使用rhandsontable),该表几乎没有现有列,一个selectinput列(其中每一行都有可供选择的选项),最后是另一列...
我正在编写一个 Spring 项目,我想使用反应式编程。 我首先创建一个客户端 bean: @豆 公共 WebClient webClient() { 返回Web客户端 .builder() ...
在 Quarkus 服务层中使用 Mutiny 处理多个异常
我正在使用 Mutiny 进行反应式编程开发 Quarkus 应用程序,并且在服务层中遇到异常处理问题。我有一个保留用户的方法,我...
我有许多设备(目前有八个,但将来可能会更多),它们带有简单的阻塞 HTTP REST API。进行 GET 调用来给出一个值,API 会响应成功或失败,但是...
我有以下运动方程 移动 = 目标位置 - 位置 位置=位置+移动 其中 target_position 是一个流,position 初始化为零。我想要一个...
Kafka Streams 和 CompletableFuture(或异步 java api)
我正在研究 Apache Kafka Stream SPI。我想知道是否有一种方法可以在 mapValues 方法内部执行异步代码。例如从外部存储检索数据。有没有办法整合...
在更改单个模块中的输入时,如何更改多个 Shiny 模块的输入?
我正在使用一个相当大的 Shiny 应用程序,其中包含多个模块。应用程序中的每个选项卡都是其自己的模块。某些选项卡具有一些共享输入,以及各自选项卡的不同输入。 这...