调用Hazelcast Sources.jdbc时如何使用DataSource bean

问题描述 投票:0回答:1

我正在尝试基于 SQL 查询为 Hazelcast Jet 创建源。

我不想直接传递 jdbc url,而是想使用在 Spring 上下文中注册的

DataSource

然后,我想通过调用创建一个源 Sources.jdbc(newConnectionFn, newConnectionFn, createOutputFn)

所以,我认为我的

SupplierEx<Connection>
应该类似于下面的类,并且当 Hazelcast 实例收到此函数时,所有内容都将被正确反序列化:

@SpringAware
public class DataSourceBeanHolder implements SupplierEx<Connection>, ApplicationContextAware {
    @Autowired
    private transient DataSource dataSource;

    public DataSourceBeanHolder(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.dataSource = applicationContext.getBean(DataSource.class);
    }

    @Override
    public Connection getEx() throws Exception {
        return dataSource.getConnection();
    }
}
BatchSource<Long> source = Sources.jdbc(
            new DataSourceBeanHolder(dataSource),
            (con, parallelism, index) -> {
                PreparedStatement stmt = con.prepareStatement("SELECT ID FROM MYTABLE WHERE  MOD(ID, ?) = ?)");
                stmt.setInt(1, parallelism);
                stmt.setInt(2, index);
                return stmt.executeQuery();
            },
            resultSet -> resultSet.getLong(1));

但是,当我尝试运行此程序时,

DataSourceBeanHolder
被反序列化,并且
@Autowired
字段均未设置,也未调用
setApplicationContext
。 Hazelcast 实例因
DataSourceBeanHolder.getEx()

中的 NullPointerException 崩溃

我错过了什么? 为什么

DataSourceBeanHolder
的反序列化方式与其他类不同?


我没有忘记配置

SpringManagedContext

我已经配置了

SpringManagedContext
并且在其他地方也能正常工作。即使在 Jet 中,我也可以毫无问题地使用
com.hazelcast.spring.jet.JetSpringServiceFactories#bean(java.lang.String, java.lang.Class<T>)

config.setManagedContext(new SpringManagedContext(springContext));

这是一个仅关于

Sources.jdbc
和参数
newConnectionFn

的问题

Hazelcast 版本:5.4.0


spring hazelcast hazelcast-jet
1个回答
0
投票

此答案假设您正在使用嵌入式 hazelcast。

将数据源存储在单例类中

@Component
public class DataSourceProvider implements ApplicationContextAware {

  private static DataSource dataSource;

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) {
    dataSource = applicationContext.getBean(DataSource.class);
  }

  public static DataSource getDataSource() {
    return dataSource;
  }
}

然后使用来自SupplierEx 的单例。我们这样做的原因是,SupplierEx 被序列化,然后在提交作业时反序列化。当它被反序列化时,我们需要能够再次访问数据源

public class NewConnectionProvider implements SupplierEx<Connection> {

  @Override
  public Connection getEx() throws Exception {
    DataSource dataSource = DataSourceProvider.getDataSource();
    return dataSource.getConnection();
  }
}

然后在Sources.jdbc中使用

@Component
@RequiredArgsConstructor
public class JdbcSourceCommandLineRunner implements CommandLineRunner {

  private final HazelcastInstance hazelcastInstance;

  @Override
  public void run(String... args) {
    BatchSource<Long> source = Sources.jdbc(
      new NewConnectionProvider(),
      (con, parallelism, index) -> {
        PreparedStatement stmt = con
          .prepareStatement("SELECT ID FROM MYTABLE WHERE  MOD(ID, ?) = ?)");
        stmt.setInt(1, parallelism);
        stmt.setInt(2, index);
        return stmt.executeQuery();
      },
      resultSet -> resultSet.getLong(1));

    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(source)
      .writeTo(Sinks.logger());

    JetService jetService = hazelcastInstance.getJet();
    Job job = jetService.newJob(pipeline);
    job.join();
  }
}

我希望这有帮助

© www.soinside.com 2019 - 2024. All rights reserved.