我正在尝试实现一个Java程序,为数据库准备一堆可运行程序(有数百个,我想为每个数据库分配一个线程),并使用
ScheduledExecutorService
运行每个可运行程序,初始延迟和执行之间的一些延迟。这是我到目前为止所做到的:
public static void main(String[] args) throws Exception {
// Get db name and info as key-value pairs
Map<String, DbInfo> dbInfoMap = getDbInfoMap();;
// Initialize the scheduled executor service.
final ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
// For each database, get a list of runnable and process accordingly.
dbInfoMap.forEach((db, dbInfo) -> {
// Get a list of runnable(s).
List<Runnable> runnableList = new MigrationTaskManager(db, dbInfo).getRunnableTasksForDB();
// Schedule all the runnable(s) using the scheduled executor service, and a delay
runnableList.forEach(runnable -> executorService
.scheduleWithFixedDelay(runnable, INITIAL_DELAY, DELAY, TimeUnit.MILLISECONDS));
});
}
}
这符合我的意图,尽管有一些警告(如果列表太长,则需要很长时间才能开始处理某些数据库)。如何实现它以便每个数据库的所有可运行对象同时被触发?例如,如果我有 100 个数据库可供读取,如何确保在程序启动时触发 100 个线程,每个线程中都有一堆可运行对象?我真的很感激一些帮助。
你的标题说:
确保所有可运行对象同时被触发
如果您的意思是希望所有
Runnable
任务同时执行,则使用 Java 线程是不可能的。
在Java中,您要求执行一个线程,但是执行何时开始以及执行持续多长时间取决于主机操作系统线程调度程序的突发奇想。当今 Java 中的每个线程都映射到一个主机操作系统线程。 (即将推出的 Java 21+ 中的虚拟线程将与这些“平台”线程有很大不同。)Java 程序员无法控制线程何时运行。
您可以收集一堆任务一起提交给执行器服务。但这并不意味着他们将被一起处决。它们可以按任何顺序执行。 这是一个示例应用程序,演示将
Callable
对象的集合传递给
invokeAll
的 ExecutorService
方法。package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;
public class ManyDatabases
{
public static void main ( String[] args )
{
ManyDatabases app = new ManyDatabases ( );
app.demo ( );
}
private void demo ( )
{
System.out.println ( "INFO - Demo done. " + Instant.now ( ) );
// Dummy data
List < DatabaseInfo > databaseInfos =
IntStream
.rangeClosed ( 0 , 6 )
.mapToObj ( ( int index ) ->
new DatabaseInfo (
ENGINE.values ( )[ ThreadLocalRandom.current ( ).nextInt ( ENGINE.values ( ).length ) ] ,
List.of ( "A" , "B" , "C" , "D" , "E" , "F" , "G" ).get ( index )
)
)
.toList ( );
System.out.println ( "databaseInfos = " + databaseInfos );
// Make tasks.
List < DbTask > tasks = databaseInfos.stream ( ).map ( DbTask :: new ).toList ( );
// Execute tasks.
try (
ExecutorService executorService = Executors.newFixedThreadPool ( 3 ) ;
)
{
try
{
List < Future < Boolean > > futures = executorService.invokeAll ( tasks );
}
catch ( InterruptedException e ) { throw new RuntimeException ( e ); }
}
System.out.println ( "INFO - Demo done. " + Instant.now ( ) );
}
}
enum ENGINE { POSTGRES, MS_SQL_SERVER, MYSQL }
record DatabaseInfo( ENGINE engine , String address ) { }
class DbTask implements Callable < Boolean >
{
final private DatabaseInfo databaseInfo;
public DbTask ( final DatabaseInfo databaseInfo )
{
this.databaseInfo = databaseInfo;
}
@Override
public Boolean call ( )
{
System.out.println ( Instant.now ( ) + " TASK Simulating update to database: " + this.databaseInfo );
try { Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 500 ) ) ); /* Simulate real work being done. */ } catch ( InterruptedException e ) { throw new RuntimeException ( e ); }
return true;
}
}