我的数据库中有 5,000,000 个实体。我通过反应式驱动程序(r2dbc)连接到数据库。接下来,我想将其拆分为 100,000 个实体,将它们拆分为 1,000 个实体的捆绑包,并将每个捆绑包进一步发送。当 100,000 个处理完一部分后,再获取 100,000 个。如果超过10万就OOM了
BIG_DATA -get 100.000-> make Batch -1.000(并行)-> 处理。如果成功-再获得更多
但是当我尝试模拟它时,所有原始数据都会被提取,然后才被分成多个包
public static void main(String[] args) throws InterruptedException {
Flux.range(0, 12)
.limitRate(6)
.doOnNext(in -> log.info("NEW"))
.parallel(2)
.runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(2)))
.collect(() -> new ArrayList<Integer>(), List::add)
.flatMap(DataMigrationServiceApplication::check)
.subscribe();
Thread.sleep(10000);
}
private static Mono<List<Integer>> check(List<Integer> input) {
return Mono.defer(() -> Mono.just(input))
.doOnNext(in -> log.info("IN {}", in))
.subscribeOn(schedulers.parallel());
}
日志
16:53:57.871 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO .DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.881 [pool-1-thread-2] INFO DataMigrationServiceApplication -- IN [1, 3, 5, 7, 9, 11]
16:53:57.881 [pool-1-thread-1] INFO DataMigrationServiceApplication -- IN [0, 2, 4, 6, 8, 10]
我尝试 limitRate、buffer、window
请帮忙
好吧,我做这个:
generator
.window(20)
.flatMap(inFlux -> inFlux.parallel(4).runOn(/*scheduler*/).collect(() -> new ArrayList<Integer>(), List::add).flatMap(/*process*/))
.subscribe();
它使 Flux 具有 20 个元素,然后 4-rails(并行)以循环方式获取元素。每个轨道采用 5 个元素并对其进行处理。