我这是从卡夫卡读取数据,进行一定的汇总和结果写入到elasticsearch索引的弗林克工作。我看到在源上高背压。在数据的高背压的结果被从卡夫卡慢慢读,我看到看到数据在网络堆栈排队(netstat的RecvQ示出了卡在源卡夫卡连接的数据的字节几万,所述数据最终被读出),其在导通的原因数据以一个滞后之后sinked成elasticsearch和这种滞后不断增加。
源是生产〜17500个记录每分钟的弗林克作业分配(事件)时间戳给每个来电记录,确实12种不同类型keyBy的,适合的事件,在1分钟翻滚窗口,在此键合的窗户流执行聚合操作和最后写入结果,以12个不同的elasticsearch索引(每个写是插入)。
问题是,数据被写入到elasticsearch开始落后使仪表盘的结果(建立在elasticsearch的顶部)不再是实时的。我的理解是,这是因为背压积聚发生。不知道如何解决这个问题。集群本身是一个基于VM单个节点的独立簇,具有64GB RAM(任务管理器被配置为使用20GB)和16个vCPU。没有被限制的CPU或存储器的证据(如从HTOP看到的)。只有一个任务管理器,这是此群集上唯一弗林克工作。
我不知道如果这个问题是由于在群集或由于一些地方的资源问题写信给elasticsearch正在缓慢。我已经设置了setBulkFlushMaxActions为1(如在所有的代码示例我已经见过的任何地方完成的),做我需要设置setBulkFlushInterval和/或setBulkFlushMaxSizeinMB也?
我已经通过https://www.da-platform.com/flink-forward-berlin/resources/improving-throughput-and-latency-with-flinks-network-stack走了,但还没有试过的幻灯片19,不知道是什么值,为这些参数设置列出的调节选项。
最后,我不认为我看到了同样的问题,从IDE的IntelliJ内运行相同的工作时。
我要排除掉所有的聚合和重新添加它们一个接一个,看看是否有造成此问题的特定聚集?
任何具体的指针将不胜感激,也将尝试setBulkFlushInterval和setBulkFlushMaxSizeinMB。
更新1,2019年1月29日好像两个节点都在非常高的堆使用运行,以便GC不断尝试运行在JVM清除空间。会得到物理内存从16到32GB的增加,然后重新启动的节点。这应该有希望解决的proble,就会知道在另一个24小时。
典型地,在这样的情况下的问题是在连接到所述外部数据存储器 - 无论是足够的带宽,或同步写入针对每个记录,而不是分批的写入。
一个简单的方法来验证elasticsearch下沉的问题(而不是,说,网络堆栈配置)将与丢弃接收器(一个简单的什么都不做)来取代它,看看是否能解决问题。就像是
public static class NullSink<OUT> implements SinkFunction<OUT> {
@Override
public void invoke(OUT value, Context context) throws Exception {
}
}
更新:
问题是,你已经设置bulk.flush.max.actions 1,防止在连接到服务器elasticsearch任何缓冲。
这个问题得到了通过增加(加倍)的elasticearch群集节点上的RAM和设置指数刷新间隔(在所有elasticsearch指数)到30秒(默认为1秒)解决。做了这些改变在弗林克背压报告为确定后,没有数据的滞后,一切看起来桃色的。