我正在尝试解决一个问题:有一个字符串列表,每个字符串必须单独处理。每个字符串的处理时间大约为 40-50 毫秒,如果列表中有 200 个字符串,则已经大约一秒了,这是很多。
我正在尝试找到一种使用并行化来加速处理的方法。一开始我尝试通过
CompletableFuture
来做,但是在另一个问题中专家说这不是最佳方式。我尝试了其他方法并写了一个基准。根据我的结果,最好的选择是 CompletableFuture
和 FixedThreadPool
。
结果:
Benchmark Mode Cnt Score Error Units
TaskBenchmark.benchmarkCompletableFuture thrpt 9 0,430 ± 0,001 ops/s
TaskBenchmark.benchmarkCompletableFutureWithExecutor thrpt 9 3,848 ± 0,010 ops/s
TaskBenchmark.benchmarkForkJoin thrpt 9 0,437 ± 0,024 ops/s
TaskBenchmark.benchmarkParallel thrpt 9 0,423 ± 0,008 ops/s
TaskBenchmark.benchmarkCompletableFuture avgt 9 2,326 ± 0,010 s/op
TaskBenchmark.benchmarkCompletableFutureWithExecutor avgt 9 0,260 ± 0,001 s/op
TaskBenchmark.benchmarkForkJoin avgt 9 2,286 ± 0,122 s/op
TaskBenchmark.benchmarkParallel avgt 9 2,360 ± 0,018 s/op
在解决方案中
CompletableFutureWithExecutor
,我不喜欢安装100个线程,因为有些任务正在队列中等待。但似乎增加到 500 个线程并不是一个好主意。
也许我做错了什么?或者还有其他更优化的方法吗?
PS:
doWork
方法模拟字符串处理。该方法无法优化,因为它会阻塞调用第三方服务..
CompletableFuture
:
public class ComputableFutureExample implements AutoCloseable {
private ExecutorService executor = Executors.newFixedThreadPool(100);
public List<String> handle(List<String> records) {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (String record : records) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> WorkUtil.doWork(record));
futures.add(future);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.join();
}
public List<String> handleWithExecutor(List<String> records) {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (String record : records) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> WorkUtil.doWork(record), executor);
futures.add(future);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.join();
}
@Override
public void close() {
executor.shutdown();
}
}
RecursiveTask
:
public class RecordTask extends RecursiveTask<List<String>> {
private final List<String> records;
public RecordTask(List<String> records) {
this.records = records;
}
@Override
protected List<String> compute() {
if (records.size() > 1) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.map(ForkJoinTask::join)
.flatMap(List::stream)
.toList();
} else {
return processing(records);
}
}
private Collection<RecordTask> createSubtasks() {
List<RecordTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new RecordTask(
records.subList(0, records.size() / 2)));
dividedTasks.add(new RecordTask(
records.subList(records.size() / 2, records.size())));
return dividedTasks;
}
private List<String> processing(List<String> records) {
return records.stream().map(WorkUtil::doWork).toList();
}
}
public class WorkUtil {
public static String doWork(String record) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return record + " completed";
}
}
基准:
@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 3, time = 5)
@Measurement(iterations = 3, time = 5)
@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
@Fork(value = 3)
public class TaskBenchmark {
@State(Scope.Benchmark)
public static class DataProvider {
private List<String> records;
@Setup(Level.Invocation)
public void setup() {
this.records = IntStream.range(0, 500).mapToObj(i -> UUID.randomUUID().toString() + i).toList();
}
}
@Benchmark
public void benchmarkForkJoin(DataProvider dataProvider) {
RecordTask recordTask = new RecordTask(dataProvider.records);
try (ForkJoinPool forkJoinPool = new ForkJoinPool()) {
forkJoinPool.invoke(recordTask);
}
}
@Benchmark
public void benchmarkCompletableFuture(DataProvider dataProvider) {
try (ComputableFutureExample example = new ComputableFutureExample();) {
example.handle(dataProvider.records);
}
}
@Benchmark
public void benchmarkCompletableFutureWithExecutor(DataProvider dataProvider) {
try (ComputableFutureExample example = new ComputableFutureExample();) {
example.handleWithExecutor(dataProvider.records);
}
}
@Benchmark
public void benchmarkParallel(DataProvider dataProvider) {
dataProvider.records.parallelStream().map(WorkUtil::doWork).toList();
}
}
您可以使用虚拟线程。它们重量轻且一次性。由于您的任务不受 CPU 限制,因此运行良好。
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
public class Workem{
static String work(String s){
try{
Thread.sleep(50);
} catch( Exception e){
throw new RuntimeException(e);
}
return s;
}
public static void main(String[] args) throws Exception{
List<String> strings = new ArrayList<>();
for(int i = 0; i<1000000; i++){
strings.add("id: " + i);
}
long start = System.nanoTime();
List<Callable<String>> r = strings.stream().map( str ->(Callable<String>)() -> work(str) ).toList();
Executors.newVirtualThreadPerTaskExecutor().invokeAll(r);
System.out.println("finished: " + ((System.nanoTime() - start)*1e-9) );
}
}