我有一个方法,它将获取具有 INSERT 查询的文件的路径并将它们批量插入到数据库中。这是示例代码:
public void setupDB(String sqlFilePath, int batchSize, int threadCount) {
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<CompletableFuture<Void>> futures = new ArrayList<>();
log.info("Started Processing File:{} at {}", sqlFilePath, LocalDateTime.now());
try (var reader = new BufferedReader(new FileReader(Path.of(sqlFilePath).toFile()))) {
var batch = new ArrayList<String>();
reader.lines().forEach(line -> {
batch.add(line);
if (batch.size() == batchSize) {
var batchCopy = List.copyOf(batch);
futures.add(CompletableFuture.runAsync(() -> repository.executeBatch(batchCopy), executorService));
batch.clear();
}
});
if (!batch.isEmpty()) {
var future = CompletableFuture.runAsync(() -> repository.executeBatch(batch), executorService);
futures.add(future);
}
log.info("Batches size: {}", futures.size());
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
log.info("Finished Processing SQL File at {}", LocalDateTime.now());
} catch (Exception ex) {
log.error("Error seeding db", ex);
} finally {
executorService.shutdown();
}
}
将数据插入数据库的代码:
@Retryable(backoff = @Backoff(delay = 15000, multiplier = 2, maxDelay = 300000), retryFor = CannotGetJdbcConnectionException.class)
public void executeBatch(List<String> queries) {
jdbcTemplate.execute((Connection connection) -> executeBatch(connection, queries));
}
private int[] executeBatch(Connection connection, List<String> insertQueries) throws SQLException {
connection.setAutoCommit(false);
int[] results = new int[0];
try (Statement statement = connection.createStatement()) {
for (String query : insertQueries) {
statement.addBatch(query);
}
results = statement.executeBatch();
connection.commit();
} catch (Exception ex) {
connection.rollback();
log.error("Error executing sql batch", ex);
} finally {
connection.setAutoCommit(true);
}
return results;
}
调用函数的代码:
var setOneTasks = CompletableFuture.runAsync(() -> {
preparedLayerDataService.setupDB(sqlPath1, 500, 30);
});
var setTwoTasks = CompletableFuture.runAsync(() -> {
preparedLayerDataService.setupDB(sqlPath2, 500, 30);
});
CompletableFuture.allOf(setOneTasks, setTwoTasks).join();
log.info("Completed loading data");
统计:
问题:
如果你使用 Spring boot,你可以这样做this
您可以在资源文件中添加 data.sql,该文件将被初始化。