如何在Flink中使用多个计数器

问题描述 投票:0回答:1

(与How to create dynamic metric in Flink有关的种类]

我有一个events(someid:String, name:String)流,并且出于监视原因,我需要一个计数器per事件ID。在所有Flink文档和示例中,我都可以看到该计数器是,例如,以map函数的open中的名称初始化。

但是在我的情况下,我无法初始化计数器,因为每个eventId需要一个计数器,并且我不预先知道该值。另外,我知道每当在MapFunction的map()方法中一次传递一个新计数器时,创建一个新计数器将是多么昂贵。最后,我不能保留计数器的“缓存”,因为它太大了。

理想情况下,我想要这样的东西:

class Event(id: String, name: String)

class ExampleMapFunction extends RichMapFunction[Event, Event] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    counter = new Counter()
  }

  override def map(event: Event): Event = {
    counter.inc(event.id)
    event
  }
}

或者基本上我可以实现自己的计数器以允许我传递尺寸?如果是,如何?

关于这种用例的任何建议或最佳做法吗?

scala apache-flink metrics
1个回答
0
投票

如果保留计数器的缓存太大,那么我认为使用指标不会以满足您的需求的方式扩展。

一些替代方法:

  • 使用侧输出在某些外部的可查询/可视化数据存储中收集有意义的事件,例如influxdb。] >>

  • 将信息保持为键控状态,并根据需要使用广播消息触发其相关部分的输出(再次使用侧面输出)。

  • 将信息保持在键控状态,并获取定期保存点,然后使用状态处理器API通过查询进行分析。

© www.soinside.com 2019 - 2024. All rights reserved.