Flink / Kafka应用程序的等待时间图上的选项

问题描述 投票:0回答:1

我有一个应用程序,它从Kafka主题接收推文,有一个窗口,然后通过AssyncIO操作将这些推文保存在Cassandra上,该操作允许最多打开100个线程(AsyncIO运算符的最后一个参数),而无需执行任何操作对数据进行预处理:只需保存一条带有时间的时间戳即可保存一条推文。

然后,我强调Flink应用程序发送了300万条推文,并在Grafana中绘制了一张图表,显示了数据库中保存了多少条推文,但是此图显示了一些选择,不是连续的,我不能了解原因。

因此,您可以看到,每隔一分钟它会节省7k,然后降到5k,然后降到2k。如果能找出原因,我将不胜感激!非常感谢!

enter image description here

twitter apache-kafka apache-flink flink-streaming
1个回答
1
投票

首先,如果要写卡桑德拉,我会用connector。如果不是不可能的话,很难正确地手动执行一次准确的操作。

其次,AsyncIO没有启动100个线程。实际上,它并没有为用户启动任何线程。您需要通过任何方式自己启动它们。通常,它使用库具有自己的连接池的外部系统的回调机制。

如果要进行同步调用,则需要管理自己的线程池。我建议使用Executors.newCachedThreadPool()并向其提交异步任务。 AsyncIO仅会帮助将异步结果合并回同步流中。

第三,100个线程可能很多,具体取决于您的设置。另请注意,如果您使用Flink的放大(每个任务管理器使用多个插槽),则会使使用的线程成倍增加。

© www.soinside.com 2019 - 2024. All rights reserved.