我在kafka有一个主题,在那里我得到了json格式的多种类型的事件。我已经创建了一个filestreamsink,将这些事件写到S3与bucketing。
FlinkKafkaConsumer errorTopicConsumer = new FlinkKafkaConsumer(ERROR_KAFKA_TOPICS,
new SimpleStringSchema(),
properties);
final StreamingFileSink<Object> errorSink = StreamingFileSink
.forRowFormat(new Path(outputPath + "/error"), new SimpleStringEncoder<>("UTF-8"))
.withBucketAssigner(new EventTimeBucketAssignerJson())
.build();
env.addSource(errorTopicConsumer)
.name("error_source")
.setParallelism(1)
.addSink(errorSink)
.name("error_sink").setParallelism(1);
public class EventTimeBucketAssignerJson implements BucketAssigner<Object, String> {
@Override
public String getBucketId(Object record, Context context) {
StringBuffer partitionString = new StringBuffer();
Tuple3<String, Long, String> tuple3 = (Tuple3<String, Long, String>) record;
try {
partitionString.append("event_name=")
.append(tuple3.f0).append("/");
String timePartition = TimeUtils.getEventTimeDayPartition(tuple3.f1);
partitionString.append(timePartition);
} catch (Exception e) {
partitionString.append("year=").append(Constants.DEFAULT_YEAR).append("/")
.append("month=").append(Constants.DEFAULT_MONTH).append("/")
.append("day=").append(Constants.DEFAULT_DAY);
}
return partitionString.toString();
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
现在,我想发布每小时计数的每个事件的指标,普罗米修斯和发布一个Grafana仪表板。
所以,请帮助我如何实现每小时计数的每个事件使用flink指标和发布到普罗米修斯。
谢谢你的帮助
通常情况下,这是由简单的创建一个计数器的请求,然后使用 rate()
函数,这将给你在给定时间内的请求率。
然而,如果你出于某种原因想自己做这件事,那么你可以做一些类似于在以下函数中所做的事情。org.apache.kafka.common.metrics.stats.Rate
. 因此,在这种情况下,你需要收集的样品列表,他们在收集的时间,以及窗口大小,你要使用的速率计算,然后你可以简单地进行计算,即删除样品的范围外,已经过期,然后简单地计算有多少样品在窗口。
然后你可以设置 Gauge
到计算值。