我试图一次处理两个实体,因此我想使用信号量。 我将首先展示我以前的工作代码。即使线程池只有两个线程,这也不能保证一次执行两个。
CompletableFuture<Void> taskFuture = idList.stream()
.map(id -> (Runnable) () -> processEntity(startTime, id))
.map(task -> CompletableFuture.runAsync(task, executorService))
.collect(Collectors.collectingAndThen(Collectors.toList(),
tasks -> CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))));
taskFuture.get();
使用Semaphore修改后的逻辑如下:
var semaphore = new StreamSemaphore(MAX_CONCURRENT_EXECUTIONS);
var futureTasks = itemList.stream()
.map(semaphore::acquire)
.map(itemId -> (Supplier<Void>) () -> processItem(startTime, itemId))
.map(task -> CompletableFuture.supplyAsync(task, taskExecutor)
.thenAccept(semaphore::release)
.exceptionally(e -> {
log.error("Exception occurred while processing item", e);
System.exit(1);
return null;
}))
.toList();
futureTasks.forEach(TaskProcessor::safeGet);
StreamSemaphore 在哪里
public class StreamSemaphore {
private final Semaphore sema;
public StreamSemaphore(int slots) {
sema = new Semaphore(slots);
}
@SneakyThrows
public <T> T acquire(T obj) {
sema.acquire();
return obj;
}
@SneakyThrows
public <T> T release(T obj) {
sema.release();
return obj;
}
}
您需要在处理线程(以有限并发执行任务的线程)上获取/释放信号量许可,因此在任务中或在任务周围的可运行中,但不能在此可运行之外。
使用线程时要养成的几个习惯:
这是一个工作示例。我必须填补空白...... 我添加了一些好东西。
请注意,将抛出异常的线程结构与不能抛出异常的函数和供应商混合在一起是多么烦人。这是一种气味,因为您没有正确使用溪流,也没有在圆孔中安装方钉。
尽管如此,这里还是一个乏味的版本。 我怀疑有一种更简单的方法可以让 2 个线程执行 N 个任务。特别是一个带有 2 个线程和一个队列的简单 ExecutorService!我仍然不明白为什么需要信号量,但如果线程池大于允许的并发量,那么它就有意义了。
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
public class X {
static void p(Object msg) {
System.out.println("[" + Thread.currentThread().getName() + "]:\t" + msg);
}
static void f(String pattern, Object... args) {
System.out.println("[" + Thread.currentThread().getName() + "]:\t" + String.format(pattern, args));
}
public static void main(String[] args) throws Exception {
workIt(IntStream.range(0, 20).mapToObj(i->i).toList());
}
static final int MAX_CONCURRENT_EXECUTIONS = 2;
static void workIt(List<Integer> itemList) {
AtomicInteger nextId = new AtomicInteger();
ExecutorService taskExecutorSvc = Executors.newFixedThreadPool(10, r -> new Thread(r, "mythread-"+nextId.incrementAndGet()));
Semaphore semaphore = new Semaphore(MAX_CONCURRENT_EXECUTIONS);
Function<Integer,String> processWithPermit = itemId -> {
try {
semaphore.acquire();
p("acquired");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
return processItemIntoSring(itemId);
} finally {
p(" releasing");
semaphore.release();
}
};
List<CompletableFuture<String>> futures = itemList.stream()
.map(itemId -> CompletableFuture.supplyAsync( () -> processWithPermit.apply(itemId), taskExecutorSvc)
.exceptionally(e -> {
logError("Exception occurred while processing item", e);
throw new RuntimeException(e);
})
)
.toList();
futures.forEach(f -> {
try {
p(f.get());
} catch (InterruptedException | ExecutionException e) {
logError("failed get", e);
}
});
taskExecutorSvc.shutdownNow();
}
static String processItemIntoSring(Integer itemId) {
p(" doing "+itemId);
return "something"+itemId;
}
static void logError(String msg, Throwable e) {
System.err.println(msg + e);
e.printStackTrace();
}
}