我试图理解重复订阅一个Mono并延迟1秒并根据某个条件停止订阅的效果,我有这行代码
Mono.just(UUID.randomUUID())
.map(uuid -> uuid.toString())
.doOnNext(uuid -> System.out.println(uuid))
.repeatWhen(completed -> completed.delayElements(Duration.ofMillis(1000)))
.takeUntil(uuid -> uuid.startsWith("1234"))
.subscribe();
我对这一行的期望是,每隔一秒打印一个随机的 UUID,直到生成一个以 1234 开头的 UUID,然后停止。 但它只是打印 UUID,然后退出。我如何使用这条线实现这一点?
问题在于
Mono.just(UUID.randomUUID())
的行为,它仅发出一个项目(单个随机 UUID),并且在发出该项目后立即完成。因此,repeatWhen
运算符没有机会重新订阅它,因为源Mono
在发出第一个UUID后完成。
为了实现预期的行为,您需要修改您的方法,并且必须使用
Flux
无限期地生成随机 UUID,然后应用重复逻辑。
你可以用 Flux 试试这个:
Flux<UUID> uuidFlux = Flux.generate(sink -> sink.next(UUID.randomUUID()));
Mono<Long> delay = Mono.delay(Duration.ofSeconds(1));
uuidFlux
.map(uuid -> uuid.toString())
.doOnNext(uuid -> System.out.println(uuid))
.repeatWhen(completed ->
completed
.zipWith(delay)
.filter(tuple -> !tuple.getT1().toString().startsWith("1234"))
.map(Tuple2::getT2)
)
.takeUntil(uuid -> uuid.startsWith("1234"))
.subscribe();