我们有一个用例:
将流分成更小的块,其大小小于 阈值并并行处理块。
我们当前的实现是这样的,其中 counter 是
ForkJoinPool
中所有线程的共享变量。当前实现的问题是,我们观察到某些块的大小超过了阈值,我们怀疑这是否是由于公共共享变量造成的。
AtomicLong counter = new AtomicLong(0L);
StreamEx.of(stream).parallel(FORK_JOIN_POOL).groupRuns((prev, next) -> counter.incrementAndGet() % batchSize != 0)
.forEach(entities -> {
// business logic.
});
因此我们尝试将条件修改为以下。但我们观察到一些块仍然超过了批量大小值。任何线索都会有帮助。蒂亚!
ThreadLocal<AtomicLong> threadLocalCounter = ThreadLocal.withInitial(() -> new AtomicLong(0L));
StreamEx.of(stream).parallel(FORK_JOIN_POOL).groupRuns((prev, next) ->
{
AtomicLong counter = threadLocalCounter.get();
return counter.incrementAndGet() % batchSize != 0;
}.forEach(entities -> {
// business logic.
});
你可以用整数压缩它并进行诚实的分区。
StreamEx.of(stream).zipWith(Stream.iterate(0, a->a++))
.map(x->x)
.parallel().groupRuns((prev, next) -> next.getValue() % batchSize != 0)
.forEach(entries -> {
var entities = entries.stream().map(x->x.getKey()).toList();
// business logic.
});