StreamEx 组并行运行未按预期工作

问题描述 投票:0回答:1

我们有一个用例:

将流分成更小的块,其大小小于 阈值并并行处理块。

我们当前的实现是这样的,其中 counter 是

ForkJoinPool
中所有线程的共享变量。当前实现的问题是,我们观察到某些块的大小超过了阈值,我们怀疑这是否是由于公共共享变量造成的。

AtomicLong counter = new AtomicLong(0L);
StreamEx.of(stream).parallel(FORK_JOIN_POOL).groupRuns((prev, next) -> counter.incrementAndGet() % batchSize != 0)
                .forEach(entities -> {
      // business logic.
});

因此我们尝试将条件修改为以下。但我们观察到一些块仍然超过了批量大小值。任何线索都会有帮助。蒂亚!

ThreadLocal<AtomicLong> threadLocalCounter = ThreadLocal.withInitial(() -> new AtomicLong(0L));
StreamEx.of(stream).parallel(FORK_JOIN_POOL).groupRuns((prev, next) -> 
{
   AtomicLong counter = threadLocalCounter.get();
   return counter.incrementAndGet() % batchSize != 0;
}.forEach(entities -> {
          // business logic.
});
java-stream forkjoinpool streamex
1个回答
0
投票

你可以用整数压缩它并进行诚实的分区。

    StreamEx.of(stream).zipWith(Stream.iterate(0, a->a++))
        .map(x->x)
        .parallel().groupRuns((prev, next) -> next.getValue() % batchSize != 0)
        .forEach(entries -> {
            var entities = entries.stream().map(x->x.getKey()).toList();    
            // business logic.
        });
© www.soinside.com 2019 - 2024. All rights reserved.