我最近开始使用 Vert.x。早些时候我使用的是 Play 框架。在游戏中,我使用带有自定义 MessageDispatcherConfigurator 的 Actor 系统来使用自定义 ThreadPoolExecutor,它在切换执行器或在同一执行器池中切换线程时传播线程上下文。
public class CustomDispatcherConfigurator extends MessageDispatcherConfigurator {
private final CustomDispatcher instance;
public CustomDispatcherConfigurator(Config config, DispatcherPrerequisites prerequisites) {
super(config, prerequisites);
Config threadPoolConfig = config.getConfig("thread-pool-executor");
int fixedPoolSize = threadPoolConfig.getInt("fixed-pool-size");
instance = new CustomDispatcher(
this,
config.getString("id"),
config.getInt("throughput"),
Duration.create(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS),
(id, threadFactory) -> () -> new CustomThreadPoolExecutor(fixedPoolSize,
fixedPoolSize,
threadPoolConfig.getDuration("keep-alive-time", TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(),
new ThreadFactory() {
private int threadId = 1;
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r);
thread.setName(config.getString("name") + "-" + threadId++);
return thread;
}
}),
Duration.create(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
);
}
@Override
public MessageDispatcher dispatcher() {
return instance;
}
}
class CustomDispatcher extends Dispatcher {
public CustomDispatcher(MessageDispatcherConfigurator _configurator,
String id,
int throughput,
Duration throughputDeadlineTime,
ExecutorServiceFactoryProvider executorServiceFactoryProvider,
scala.concurrent.duration.FiniteDuration shutdownTimeout) {
super(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout);
}
}
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull ThreadFactory threadFactory,
@NotNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public <T> @NotNull Future<T> submit(@NotNull Callable<T> task) {
return super.submit(ContextUtility.wrapWithContext(task));
}
@Override
public <T> @NotNull Future<T> submit(@NotNull Runnable task, T result) {
return super.submit(ContextUtility.wrapWithContext(task), result);
}
@Override
public @NotNull Future<?> submit(@NotNull Runnable task) {
return super.submit(ContextUtility.wrapWithContext(task));
}
@Override
public void execute(@NotNull Runnable task) {
super.execute(ContextUtility.wrapWithContext(task));
}
}
我想在我的所有 Verticles 中使用相同的 CustomThreadPoolExecutor 类而不是默认的 ThreadPoolExecutor 。我该怎么做?我探索了 SPI ExecutorServiceFactory,并通过在其
createExecutor
方法中使用 CustomThreadPoolExecutor 来实现它,但我不知道如何在部署 Verticles 时使用此 CustomExecutorServiceFactory。
public class CustomExecutorServiceFactory implements ExecutorServiceFactory {
@Override
public void init(VertxBuilder builder) {
ExecutorServiceFactory.super.init(builder);
}
@Override
public ExecutorService createExecutor(ThreadFactory threadFactory, Integer concurrency, Integer maxConcurrency) {
return new CustomThreadPoolExecutor(concurrency, maxConcurrency, 10L, TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<>(), threadFactory);
}
}
Vert.x 已经内置了您想要的行为:
start()
方法到执行其 end()
方法,它将始终是同一个线程。然而,可能还有其他 Verticle 共享同一线程。context
,它保存对该线程的引用,并且可用于在同一上下文中的处理程序之间共享数据。然而,使用 Verticle 类的普通属性更快。