如何在 verticle 中使用自定义 ThreadPoolExecutor?

问题描述 投票:0回答:1

我最近开始使用 Vert.x。早些时候我使用的是 Play 框架。在游戏中,我使用带有自定义 MessageDispatcherConfigurator 的 Actor 系统来使用自定义 ThreadPoolExecutor,它在切换执行器或在同一执行器池中切换线程时传播线程上下文。

自定义调度程序配置器

public class CustomDispatcherConfigurator extends MessageDispatcherConfigurator {

    private final CustomDispatcher instance;

    public CustomDispatcherConfigurator(Config config, DispatcherPrerequisites prerequisites) {
        super(config, prerequisites);
        Config threadPoolConfig = config.getConfig("thread-pool-executor");
        int fixedPoolSize = threadPoolConfig.getInt("fixed-pool-size");
        instance = new CustomDispatcher(
                this,
                config.getString("id"),
                config.getInt("throughput"),
                Duration.create(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS),
                (id, threadFactory) -> () -> new CustomThreadPoolExecutor(fixedPoolSize,
                        fixedPoolSize,
                        threadPoolConfig.getDuration("keep-alive-time", TimeUnit.MILLISECONDS),
                        TimeUnit.MILLISECONDS,
                        new LinkedBlockingDeque<>(),
                        new ThreadFactory() {
                            private int threadId = 1;

                            @Override
                            public Thread newThread(@NotNull Runnable r) {
                                Thread thread = new Thread(r);
                                thread.setName(config.getString("name") + "-" + threadId++);
                                return thread;
                            }
                        }),
                Duration.create(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
        );
    }

    @Override
    public MessageDispatcher dispatcher() {
        return instance;
    }

}

class CustomDispatcher extends Dispatcher {

    public CustomDispatcher(MessageDispatcherConfigurator _configurator,
            String id,
            int throughput,
            Duration throughputDeadlineTime,
            ExecutorServiceFactoryProvider executorServiceFactoryProvider,
            scala.concurrent.duration.FiniteDuration shutdownTimeout) {
        super(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout);
    }

}

自定义线程池执行器

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    
    public CustomThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            @NotNull TimeUnit unit,
            @NotNull BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    
    public CustomThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            @NotNull TimeUnit unit,
            @NotNull BlockingQueue<Runnable> workQueue,
            @NotNull ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public CustomThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            @NotNull TimeUnit unit,
            @NotNull BlockingQueue<Runnable> workQueue,
            @NotNull RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public CustomThreadPoolExecutor(int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            @NotNull TimeUnit unit,
            @NotNull BlockingQueue<Runnable> workQueue,
            @NotNull ThreadFactory threadFactory,
            @NotNull RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public <T> @NotNull Future<T> submit(@NotNull Callable<T> task) {
        return super.submit(ContextUtility.wrapWithContext(task));
    }

    @Override
    public <T> @NotNull Future<T> submit(@NotNull Runnable task, T result) {
        return super.submit(ContextUtility.wrapWithContext(task), result);
    }

    @Override
    public @NotNull Future<?> submit(@NotNull Runnable task) {
        return super.submit(ContextUtility.wrapWithContext(task));
    }

    @Override
    public void execute(@NotNull Runnable task) {
        super.execute(ContextUtility.wrapWithContext(task));
    }
}

我想在我的所有 Verticles 中使用相同的 CustomThreadPoolExecutor 类而不是默认的 ThreadPoolExecutor 。我该怎么做?我探索了 SPI ExecutorServiceFactory,并通过在其

createExecutor
方法中使用 CustomThreadPoolExecutor 来实现它,但我不知道如何在部署 Verticles 时使用此 CustomExecutorServiceFactory。

自定义执行器服务工厂

public class CustomExecutorServiceFactory implements ExecutorServiceFactory {
    @Override
    public void init(VertxBuilder builder) {
        ExecutorServiceFactory.super.init(builder);
    }

    @Override
    public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
        return new CustomThreadPoolExecutor(concurrency, maxConcurrency, 10L, TimeUnit.MICROSECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
    }
}
java vert.x executorservice vertx-verticle vertxoptions
1个回答
0
投票

Vert.x 已经内置了您想要的行为:

  • Vert.x 保证对于普通 Verticles,而不是 Worker Verticles,执行线程始终保持不变。因此,从执行 Verticle 的
    start()
    方法到执行其
    end()
    方法,它将始终是同一个线程。然而,可能还有其他 Verticle 共享同一线程。
  • Vert.x 还为 Verticle 分配一个
    context
    ,它保存对该线程的引用,并且可用于在同一上下文中的处理程序之间共享数据。然而,使用 Verticle 类的普通属性更快
© www.soinside.com 2019 - 2024. All rights reserved.