使用 CompletableFuture 和 Semaphore 限制流中的并发性

问题描述 投票:0回答:1

我试图一次处理两个实体,因此我想使用信号量。 我将首先展示我以前的工作代码。即使线程池只有两个线程,这也不能保证一次执行两个。

CompletableFuture<Void> taskFuture = idList.stream()
    .map(id -> (Runnable) () -> processEntity(startTime, id))
    .map(task -> CompletableFuture.runAsync(task, executorService))
    .collect(Collectors.collectingAndThen(Collectors.toList(),
        tasks -> CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))));

taskFuture.get();

使用Semaphore修改后的逻辑如下:

var semaphore = new StreamSemaphore(MAX_CONCURRENT_EXECUTIONS);
var futureTasks = itemList.stream()
    .map(semaphore::acquire)
    .map(itemId -> (Supplier<Void>) () -> processItem(startTime, itemId))
    .map(task -> CompletableFuture.supplyAsync(task, taskExecutor)
        .thenAccept(semaphore::release)
        .exceptionally(e -> {
            log.error("Exception occurred while processing item", e);
            System.exit(1);
            return null;
        }))
    .toList();
futureTasks.forEach(TaskProcessor::safeGet);

StreamSemaphore 在哪里

public class StreamSemaphore {
    private final Semaphore sema;

    public StreamSemaphore(int slots) {
        sema = new Semaphore(slots);
    }

    @SneakyThrows
    public <T> T acquire(T obj) {
        sema.acquire();
        return obj;
    }

    @SneakyThrows
    public <T> T release(T obj) {
        sema.release();
        return obj;
    }
}

处理前两个实体后,抛出错误 enter image description here

java multithreading
1个回答
0
投票

您需要在处理线程(以有限并发执行任务的线程)上获取/释放信号量许可,因此在任务中或在任务周围的可运行中,但不能在此可运行之外。

使用线程时要养成的几个习惯:

  • 给线程工厂起个好名字,这样你就知道当你堆栈转储或分析它们时(或当它们登录到文件时)它们可能会做什么
  • 这可能只是为了测试,但通常要避免使用 System.exit();尽可能实现正常关闭。
  • 由于存在大量匿名类型,这种编码风格更难遵循。因此,拥有非常精确的变量名称和方法名称非常重要。

这是一个工作示例。我必须填补空白...... 我添加了一些好东西。

请注意,将抛出异常的线程结构与不能抛出异常的函数和供应商混合在一起是多么烦人。这是一种气味,因为您没有正确使用溪流,也没有在圆孔中安装方钉。

尽管如此,这里还是一个乏味的版本。 我怀疑有一种更简单的方法可以让 2 个线程执行 N 个任务。特别是一个带有 2 个线程和一个队列的简单 ExecutorService!我仍然不明白为什么需要信号量,但如果线程池大于允许的并发量,那么它就有意义了。

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;

public class X {
    static void p(Object msg) {
        System.out.println("[" + Thread.currentThread().getName() + "]:\t" + msg);
    }
    static void f(String pattern, Object... args) {
        System.out.println("[" + Thread.currentThread().getName() + "]:\t" + String.format(pattern, args));
    }
    
    public static void main(String[] args) throws Exception {
        workIt(IntStream.range(0, 20).mapToObj(i->i).toList());
    }
    
    static final int MAX_CONCURRENT_EXECUTIONS = 2;
    
    static void workIt(List<Integer> itemList) {
        AtomicInteger nextId = new AtomicInteger();
        ExecutorService taskExecutorSvc = Executors.newFixedThreadPool(10, r -> new Thread(r, "mythread-"+nextId.incrementAndGet()));
        Semaphore semaphore = new Semaphore(MAX_CONCURRENT_EXECUTIONS);
        
        Function<Integer,String> processWithPermit = itemId -> {
            try {
                semaphore.acquire();
                p("acquired");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            try {
                return processItemIntoSring(itemId);
            } finally {
                p("          releasing");
                semaphore.release();
            }
        };
        
        List<CompletableFuture<String>> futures = itemList.stream()
            .map(itemId -> CompletableFuture.supplyAsync( () -> processWithPermit.apply(itemId), taskExecutorSvc)
                .exceptionally(e -> {
                    logError("Exception occurred while processing item", e);
                    throw new RuntimeException(e);
                })
            )
            .toList();
        
        futures.forEach(f -> {
            try {
                p(f.get());
            } catch (InterruptedException | ExecutionException e) {
                logError("failed get", e);
            }
        });
        
        taskExecutorSvc.shutdownNow();
    }
    
    static String processItemIntoSring(Integer itemId)  {
        p("     doing "+itemId);
        return "something"+itemId;
    }
    
    static void logError(String msg, Throwable e) {
        System.err.println(msg + e);
        e.printStackTrace();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.