在JEP444中,我看到“这些阻止操作的实现通过临时扩展调度程序的并行性来弥补OS线程的捕获”。 在什么情况下,虚拟线程将扩大基础载体线程的数量?
你能给我一些代码来达到这个吗?
public class CarrierThreadExpandTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
File file = new File("/Users/info.log");
for (int i = 0; i < 32; i++) {
executorService.execute(() -> {
while (true) {
try (FileInputStream fis = new FileInputStream(file)) {
int content;
while ((content = fis.read()) != -1) {
System.out.print((char) content);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
Thread.sleep(1000 * 60 * 60);
}
}
$ jps
$ jstack pid
看“ forkjoinpool-1-worker-xx”线程。
动物选项是对象#等待方法。或使用jdk.internal.misc.blocker.
的任何其他代码在下面运行示例方法:
public static void workersTest() {
Instant start = Instant.now();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int x = i;
Thread thread = Thread.ofVirtual().start(() -> waitingWorker(x, 15));
//Thread thread = Thread.ofVirtual().start(() -> busyWorker(x, 15));
//Thread thread = Thread.ofVirtual().start(() -> busyWorkerWithBlock(x, 15));
threads.add(thread);
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.info("{}: all finished. Elapsed {} seconds", Thread.currentThread(), Duration.between(start, Instant.now()).toSeconds());
}
示例1:
private static long waitingWorker(int taskNo, int workTimeSeconds) {
log.info("{}: task-{} started", Thread.currentThread(), taskNo);
Instant currentTime = Instant.now();
Object o = new Object();
synchronized(o) {
try {
o.wait(workTimeSeconds * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Instant stopAt = Instant.now();
log.info("{}: task-{} finished", Thread.currentThread(), taskNo);
return stopAt.toEpochMilli() - currentTime.toEpochMilli();
}
示例2.
version 1(普通CPU绑定的代码,不会触发赔偿):
private static long busyWorker(int taskNo, int workTimeSeconds) {
log.info("{}: task-{} started", Thread.currentThread(), taskNo);
long i = 0;
Instant currentTime = Instant.now();
Instant stopAt = currentTime.plus(workTimeSeconds, ChronoUnit.SECONDS);
while (currentTime.isBefore(stopAt)) {
//just some cpu work
if (currentTime.toEpochMilli() % 2 == 0) {
i += 1;
} else {
i -= 1;
}
currentTime = Instant.now();
}
log.info("{}: task-{} finished", Thread.currentThread(), taskNo);
return i;
}
说:
private static long busyWorkerWithBlock(int taskNo, int workTimeSeconds) {
log.info("{}: task-{} started", Thread.currentThread(), taskNo);
long i = 0;
long comp = Blocker.begin();
try {
Instant currentTime = Instant.now();
Instant stopAt = currentTime.plus(workTimeSeconds, ChronoUnit.SECONDS);
while (currentTime.isBefore(stopAt)) {
//just some cpu work
if (currentTime.toEpochMilli() % 2 == 0) {
i += 1;
} else {
i -= 1;
}
currentTime = Instant.now();
}
} finally {
Blocker.end(comp);
}
log.info("{}: task-{} finished", Thread.currentThread(), taskNo);
return i;
}
完全相同的代码被阻止器包围将在15秒内完成执行,因为调度程序将创建额外的补偿载体线程。
P.S。要使用jdk.internal.misc.blocker,需要直接使用JDK.Internal类):
add“ - add-exports = java.base/jdk.internal.misc = all-Unnumed”编译器选项
add“ - add-opens = java.base/jdk.internal.misc = all-named” to jmv excution options