使用repeatWhen()和takeUntil()重复订阅Mono

问题描述 投票:0回答:1

我试图理解重复订阅一个Mono并延迟1秒并根据某个条件停止订阅的效果,我有这行代码

     Mono.just(UUID.randomUUID())
            .map(uuid -> uuid.toString())
            .doOnNext(uuid -> System.out.println(uuid))
            .repeatWhen(completed -> completed.delayElements(Duration.ofMillis(1000)))
            .takeUntil(uuid -> uuid.startsWith("1234"))
            .subscribe();

我对这一行的期望是,每隔一秒打印一个随机的 UUID,直到生成一个以 1234 开头的 UUID,然后停止。 但它只是打印 UUID,然后退出。我如何使用这条线实现这一点?

java spring-webflux reactive-programming project-reactor
1个回答
0
投票

问题在于

Mono.just(UUID.randomUUID())
的行为,它仅发出一个项目(单个随机 UUID),并且在发出该项目后立即完成。因此,
repeatWhen
运算符没有机会重新订阅它,因为源
Mono
在发出第一个UUID后完成。

为了实现预期的行为,您需要修改您的方法,并且必须使用

Flux
无限期地生成随机 UUID,然后应用重复逻辑。

你可以用 Flux 试试这个:

Flux<UUID> uuidFlux = Flux.generate(sink -> sink.next(UUID.randomUUID()));
Mono<Long> delay = Mono.delay(Duration.ofSeconds(1));

    uuidFlux
        .map(uuid -> uuid.toString())
        .doOnNext(uuid -> System.out.println(uuid))
        .repeatWhen(completed ->
                completed
                    .zipWith(delay)
                    .filter(tuple -> !tuple.getT1().toString().startsWith("1234"))
                    .map(Tuple2::getT2)
        )
        .takeUntil(uuid -> uuid.startsWith("1234"))
        .subscribe();
© www.soinside.com 2019 - 2024. All rights reserved.