我正在尝试基于 SQL 查询为 Hazelcast Jet 创建源。
我不想直接传递 jdbc url,而是想使用在 Spring 上下文中注册的
DataSource
。
然后,我想通过调用创建一个源 Sources.jdbc(newConnectionFn, newConnectionFn, createOutputFn)
所以,我认为我的
SupplierEx<Connection>
应该类似于下面的类,并且当 Hazelcast 实例收到此函数时,所有内容都将被正确反序列化:
@SpringAware
public class DataSourceBeanHolder implements SupplierEx<Connection>, ApplicationContextAware {
@Autowired
private transient DataSource dataSource;
public DataSourceBeanHolder(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.dataSource = applicationContext.getBean(DataSource.class);
}
@Override
public Connection getEx() throws Exception {
return dataSource.getConnection();
}
}
BatchSource<Long> source = Sources.jdbc(
new DataSourceBeanHolder(dataSource),
(con, parallelism, index) -> {
PreparedStatement stmt = con.prepareStatement("SELECT ID FROM MYTABLE WHERE MOD(ID, ?) = ?)");
stmt.setInt(1, parallelism);
stmt.setInt(2, index);
return stmt.executeQuery();
},
resultSet -> resultSet.getLong(1));
但是,当我尝试运行此程序时,
DataSourceBeanHolder
被反序列化,并且 @Autowired
字段均未设置,也未调用 setApplicationContext
。 Hazelcast 实例因 DataSourceBeanHolder.getEx()
中的 NullPointerException 崩溃
我错过了什么? 为什么
DataSourceBeanHolder
的反序列化方式与其他类不同?
我没有忘记配置
SpringManagedContext
。
我已经配置了
SpringManagedContext
并且在其他地方也能正常工作。即使在 Jet 中,我也可以毫无问题地使用 com.hazelcast.spring.jet.JetSpringServiceFactories#bean(java.lang.String, java.lang.Class<T>)
。
config.setManagedContext(new SpringManagedContext(springContext));
这是一个仅关于
Sources.jdbc
和参数newConnectionFn
的问题
Hazelcast 版本:5.4.0
此答案假设您正在使用嵌入式 hazelcast。
将数据源存储在单例类中
@Component
public class DataSourceProvider implements ApplicationContextAware {
private static DataSource dataSource;
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
dataSource = applicationContext.getBean(DataSource.class);
}
public static DataSource getDataSource() {
return dataSource;
}
}
然后使用来自SupplierEx 的单例。我们这样做的原因是,SupplierEx 被序列化,然后在提交作业时反序列化。当它被反序列化时,我们需要能够再次访问数据源
public class NewConnectionProvider implements SupplierEx<Connection> {
@Override
public Connection getEx() throws Exception {
DataSource dataSource = DataSourceProvider.getDataSource();
return dataSource.getConnection();
}
}
然后在Sources.jdbc中使用
@Component
@RequiredArgsConstructor
public class JdbcSourceCommandLineRunner implements CommandLineRunner {
private final HazelcastInstance hazelcastInstance;
@Override
public void run(String... args) {
BatchSource<Long> source = Sources.jdbc(
new NewConnectionProvider(),
(con, parallelism, index) -> {
PreparedStatement stmt = con
.prepareStatement("SELECT ID FROM MYTABLE WHERE MOD(ID, ?) = ?)");
stmt.setInt(1, parallelism);
stmt.setInt(2, index);
return stmt.executeQuery();
},
resultSet -> resultSet.getLong(1));
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(source)
.writeTo(Sinks.logger());
JetService jetService = hazelcastInstance.getJet();
Job job = jetService.newJob(pipeline);
job.join();
}
}
我希望这有帮助