在什么情况下,虚拟线程将扩大基础载波线程的数量?

问题描述 投票:0回答:2

在JEP444中,我看到“这些阻止操作的实现通过临时扩展调度程序的并行性来弥补OS线程的捕获”。 在什么情况下,虚拟线程将扩大基础载体线程的数量?

你能给我一些代码来达到这个吗?

java java-21 virtual-threads
2个回答
1
投票
public class CarrierThreadExpandTest {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
        File file = new File("/Users/info.log");
        for (int i = 0; i < 32; i++) {
            executorService.execute(() -> {
                while (true) {
                    try (FileInputStream fis = new FileInputStream(file)) {
                        int content;
                        while ((content = fis.read()) != -1) {
                            System.out.print((char) content);
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }

                }
            });
        }
        Thread.sleep(1000 * 60 * 60);
    }
}

$ jps
$ jstack pid

看“ forkjoinpool-1-worker-xx”线程。

0
投票

动物选项是对象#等待方法。或使用jdk.internal.misc.blocker.

的任何其他代码

在下面运行示例方法:

public static void workersTest() {
    Instant start = Instant.now();
    List<Thread> threads = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        int x = i;
        Thread thread = Thread.ofVirtual().start(() -> waitingWorker(x, 15));
        //Thread thread = Thread.ofVirtual().start(() -> busyWorker(x, 15));
        //Thread thread = Thread.ofVirtual().start(() -> busyWorkerWithBlock(x, 15));
        threads.add(thread);
    }
    for (Thread thread : threads) {
        try {
            thread.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    log.info("{}: all finished. Elapsed {} seconds", Thread.currentThread(), Duration.between(start, Instant.now()).toSeconds());
}

示例1:

private static long waitingWorker(int taskNo, int workTimeSeconds) {
    log.info("{}: task-{} started", Thread.currentThread(), taskNo);
    Instant currentTime = Instant.now();
    Object o = new Object();
    synchronized(o) {
        try {
            o.wait(workTimeSeconds * 1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    Instant stopAt = Instant.now();
    log.info("{}: task-{} finished", Thread.currentThread(), taskNo);
    return stopAt.toEpochMilli() - currentTime.toEpochMilli();
}

示例2.

version 1(普通CPU绑定的代码,不会触发赔偿):

private static long busyWorker(int taskNo, int workTimeSeconds) {
    log.info("{}: task-{} started", Thread.currentThread(), taskNo);
    long i = 0;
    Instant currentTime = Instant.now();
    Instant stopAt = currentTime.plus(workTimeSeconds, ChronoUnit.SECONDS);
    while (currentTime.isBefore(stopAt)) {
        //just some cpu work
        if (currentTime.toEpochMilli() % 2 == 0) {
            i += 1;
        } else {
            i -= 1;
        }
        currentTime = Instant.now();
    }
    log.info("{}: task-{} finished", Thread.currentThread(), taskNo);
    return i;
}

说:

  • jdk.virtualThreadScheduler.Parallelism =2
  • 任务=10
  • Worktime= 15秒
  • 这种方法将在75秒内完成(10个任务 * 15秒 / 2个载波线)。
version 2(使用阻止器):

private static long busyWorkerWithBlock(int taskNo, int workTimeSeconds) { log.info("{}: task-{} started", Thread.currentThread(), taskNo); long i = 0; long comp = Blocker.begin(); try { Instant currentTime = Instant.now(); Instant stopAt = currentTime.plus(workTimeSeconds, ChronoUnit.SECONDS); while (currentTime.isBefore(stopAt)) { //just some cpu work if (currentTime.toEpochMilli() % 2 == 0) { i += 1; } else { i -= 1; } currentTime = Instant.now(); } } finally { Blocker.end(comp); } log.info("{}: task-{} finished", Thread.currentThread(), taskNo); return i; }

完全相同的代码被阻止器包围将在15秒内完成执行,因为调度程序将创建额外的补偿载体线程。
P.S。要使用jdk.internal.misc.blocker,需要直接使用JDK.Internal类):

add“ - add-exports = java.base/jdk.internal.misc = all-Unnumed”编译器选项

add“ - add-opens = java.base/jdk.internal.misc = all-named” to jmv excution options
© www.soinside.com 2019 - 2024. All rights reserved.