哪个是读取慢速更改查找并丰富流输入集合的最佳方法?

问题描述 投票:0回答:1

我正在使用Apache Beam,具有1.5GB的流媒体集合。我的查询表是JDBCio mysql响应。

当我在没有侧面输入的情况下运行管道时,我的工作将在大约2分钟内完成。当我通过侧面输入来运行我的工作时,我的工作将永远不会完成,卡死。

这是我用于存储查询的代码(〜1M记录)

  PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
       "com.mysql.jdbc.Driver", "jdbc:mysql://ip")
      .withUsername("username")
      .withPassword("password"))
      .withQuery("select a_number from cell")
      .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
      .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
      public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
        return KV.of(resultSet.getString(1), resultSet.getString(1));
      }
})).apply(View.asMap());

这是我的流媒体收藏集的代码

pipeline
.apply("ReadMyFile", TextIO.read().from("/home/data/**")
.watchForNewFiles(Duration.standardSeconds(60),  Watch.Growth.<String>never()))
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))

这是我的parDo的代码,用于在(1000万条记录的)每个事件行上进行迭代

  .apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<String,Integer> i = c.element();
    String sideInputData = c.sideInput(sideData).get(i.getKey());
    if (sideInputData == null) {
      c.output(i);
    } 
  }
 }).withSideInputs(sideData));

我正在使用flink集群,但是使用直接运行器输出相同。

集群:

2 cpu6芯24GB内存

我在做什么错?I've followed this

我正在使用Apache Beam,具有1.5GB的流媒体集合。我的查询表是JDBCio mysql响应。当我在没有侧面输入的情况下运行管道时,我的工作将在大约2分钟内完成。当我...

java parallel-processing apache-flink apache-beam flink-streaming
1个回答
0
投票

如果它运行的数据少得多,我怀疑程序正在用完Java进程的所有内存。您可以通过JVisualVM或JConsole进行监视。有很多文章解决了这个问题,我只是用一个快速的Google搜索偶然发现了this one

© www.soinside.com 2019 - 2024. All rights reserved.