如何加快并行列表处理速度

问题描述 投票:0回答:1

我正在尝试解决一个问题:有一个字符串列表,每个字符串必须单独处理。每个字符串的处理时间大约为 40-50 毫秒,如果列表中有 200 个字符串,则已经大约一秒了,这是很多。

我正在尝试找到一种使用并行化来加速处理的方法。一开始我尝试通过

CompletableFuture
来做,但是在另一个问题中专家说这不是最佳方式。我尝试了其他方法并写了一个基准。根据我的结果,最好的选择是
CompletableFuture
FixedThreadPool

结果:

Benchmark                                              Mode  Cnt  Score   Error  Units
TaskBenchmark.benchmarkCompletableFuture              thrpt    9  0,430 ± 0,001  ops/s
TaskBenchmark.benchmarkCompletableFutureWithExecutor  thrpt    9  3,848 ± 0,010  ops/s
TaskBenchmark.benchmarkForkJoin                       thrpt    9  0,437 ± 0,024  ops/s
TaskBenchmark.benchmarkParallel                       thrpt    9  0,423 ± 0,008  ops/s
TaskBenchmark.benchmarkCompletableFuture               avgt    9  2,326 ± 0,010   s/op
TaskBenchmark.benchmarkCompletableFutureWithExecutor   avgt    9  0,260 ± 0,001   s/op
TaskBenchmark.benchmarkForkJoin                        avgt    9  2,286 ± 0,122   s/op
TaskBenchmark.benchmarkParallel                        avgt    9  2,360 ± 0,018   s/op

在解决方案中

CompletableFutureWithExecutor
,我不喜欢安装100个线程,因为有些任务正在队列中等待。但似乎增加到 500 个线程并不是一个好主意。

也许我做错了什么?或者还有其他更优化的方法吗?

PS:

doWork
方法模拟字符串处理。该方法无法优化,因为它会阻塞调用第三方服务..

CompletableFuture

public class ComputableFutureExample implements AutoCloseable {
   private ExecutorService executor = Executors.newFixedThreadPool(100);

    public List<String> handle(List<String> records) {
        List<CompletableFuture<String>> futures = new ArrayList<>();
        for (String record : records) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> WorkUtil.doWork(record));
            futures.add(future);
        }

        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()))
                .join();
    }

    public List<String> handleWithExecutor(List<String> records) {
        List<CompletableFuture<String>> futures = new ArrayList<>();
        for (String record : records) {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> WorkUtil.doWork(record), executor);
            futures.add(future);
        }

        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()))
                .join();
    }

    @Override
    public void close() {
        executor.shutdown();
    }
}

RecursiveTask

public class RecordTask extends RecursiveTask<List<String>> {
    private final List<String> records;

    public RecordTask(List<String> records) {
        this.records = records;
    }

    @Override
    protected List<String> compute() {
        if (records.size() > 1) {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .map(ForkJoinTask::join)
                    .flatMap(List::stream)
                    .toList();
        } else {
            return processing(records);
        }
    }

    private Collection<RecordTask> createSubtasks() {
        List<RecordTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new RecordTask(
                records.subList(0, records.size() / 2)));
        dividedTasks.add(new RecordTask(
                records.subList(records.size() / 2, records.size())));
        return dividedTasks;
    }

    private List<String> processing(List<String> records) {
        return records.stream().map(WorkUtil::doWork).toList();
    }

}
public class WorkUtil {
    public static String doWork(String record) {
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return record + " completed";
    }
}

基准:

@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 3, time = 5)
@Measurement(iterations = 3, time = 5)
@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
@Fork(value = 3)
public class TaskBenchmark {
    @State(Scope.Benchmark)
    public static class DataProvider {
        private List<String> records;

        @Setup(Level.Invocation)
        public void setup() {
            this.records = IntStream.range(0, 500).mapToObj(i -> UUID.randomUUID().toString() + i).toList();
        }
    }

    @Benchmark
    public void benchmarkForkJoin(DataProvider dataProvider) {
        RecordTask recordTask = new RecordTask(dataProvider.records);
        try (ForkJoinPool forkJoinPool = new ForkJoinPool()) {
            forkJoinPool.invoke(recordTask);
        }
    }

    @Benchmark
    public void benchmarkCompletableFuture(DataProvider dataProvider) {
        try (ComputableFutureExample example = new ComputableFutureExample();) {
            example.handle(dataProvider.records);
        }
    }

    @Benchmark
    public void benchmarkCompletableFutureWithExecutor(DataProvider dataProvider) {
        try (ComputableFutureExample example = new ComputableFutureExample();) {
            example.handleWithExecutor(dataProvider.records);
        }
    }

    @Benchmark
    public void benchmarkParallel(DataProvider dataProvider) {
        dataProvider.records.parallelStream().map(WorkUtil::doWork).toList();
    }
}
java parallel-processing completable-future
1个回答
0
投票

您可以使用虚拟线程。它们重量轻且一次性。由于您的任务不受 CPU 限制,因此运行良好。

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
public class Workem{

    static String work(String s){
        try{
            Thread.sleep(50);
        } catch( Exception e){
            throw new RuntimeException(e);
        }
        return s;
    }
    public static void main(String[] args) throws Exception{
        List<String> strings = new ArrayList<>();
        for(int i = 0; i<1000000; i++){
            strings.add("id: " + i);
        }
        long start = System.nanoTime();
        List<Callable<String>> r = strings.stream().map( str ->(Callable<String>)() -> work(str) ).toList();
        Executors.newVirtualThreadPerTaskExecutor().invokeAll(r);
        System.out.println("finished: " + ((System.nanoTime() - start)*1e-9) );
        
    }

}
© www.soinside.com 2019 - 2024. All rights reserved.