我正在使用Apache Beam,具有1.5GB的流媒体集合。我的查询表是JDBCio mysql响应。
当我在没有侧面输入的情况下运行管道时,我的工作将在大约2分钟内完成。当我通过侧面输入来运行我的工作时,我的工作将永远不会完成,卡死。
这是我用于存储查询的代码(〜1M记录)
PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://ip") .withUsername("username") .withPassword("password")) .withQuery("select a_number from cell") .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() { public KV<String, String> mapRow(ResultSet resultSet) throws Exception { return KV.of(resultSet.getString(1), resultSet.getString(1)); } })).apply(View.asMap());
这是我的流媒体收藏集的代码
pipeline .apply("ReadMyFile", TextIO.read().from("/home/data/**") .watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never())) .apply(Window.<String>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)))) .accumulatingFiredPanes() .withAllowedLateness(ONE_DAY))
这是我的parDo的代码,用于在(1000万条记录的)每个事件行上进行迭代
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() { @ProcessElement public void processElement(ProcessContext c) { KV<String,Integer> i = c.element(); String sideInputData = c.sideInput(sideData).get(i.getKey()); if (sideInputData == null) { c.output(i); } } }).withSideInputs(sideData));
我正在使用flink集群,但是使用直接运行器输出相同。
集群:
2 cpu6芯24GB内存
我在做什么错?I've followed this
我正在使用Apache Beam,具有1.5GB的流媒体集合。我的查询表是JDBCio mysql响应。当我在没有侧面输入的情况下运行管道时,我的工作将在大约2分钟内完成。当我...
如果它运行的数据少得多,我怀疑程序正在用完Java进程的所有内存。您可以通过JVisualVM或JConsole进行监视。有很多文章解决了这个问题,我只是用一个快速的Google搜索偶然发现了this one。