使用公制的事件闪烁计数

问题描述 投票:1回答:1

我在kafka有一个主题,在那里我得到了json格式的多种类型的事件。我已经创建了一个filestreamsink,将这些事件写到S3与bucketing。

FlinkKafkaConsumer errorTopicConsumer = new FlinkKafkaConsumer(ERROR_KAFKA_TOPICS,
                new SimpleStringSchema(),
                properties);
        final StreamingFileSink<Object> errorSink = StreamingFileSink
                .forRowFormat(new Path(outputPath + "/error"), new SimpleStringEncoder<>("UTF-8"))
                .withBucketAssigner(new EventTimeBucketAssignerJson())
                .build();

        env.addSource(errorTopicConsumer)
                .name("error_source")
                .setParallelism(1)
                .addSink(errorSink)
                .name("error_sink").setParallelism(1);
public class EventTimeBucketAssignerJson implements BucketAssigner<Object, String> {

    @Override
    public String getBucketId(Object record, Context context) {
        StringBuffer partitionString = new StringBuffer();
        Tuple3<String, Long, String> tuple3 = (Tuple3<String, Long, String>) record;
        try {
            partitionString.append("event_name=")
                    .append(tuple3.f0).append("/");

            String timePartition = TimeUtils.getEventTimeDayPartition(tuple3.f1);
            partitionString.append(timePartition);
        } catch (Exception e) {
            partitionString.append("year=").append(Constants.DEFAULT_YEAR).append("/")
                    .append("month=").append(Constants.DEFAULT_MONTH).append("/")
                    .append("day=").append(Constants.DEFAULT_DAY);
        }
        return partitionString.toString();
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

现在,我想发布每小时计数的每个事件的指标,普罗米修斯和发布一个Grafana仪表板。

所以,请帮助我如何实现每小时计数的每个事件使用flink指标和发布到普罗米修斯。

谢谢你的帮助

stream apache-flink flink-streaming
1个回答
0
投票

通常情况下,这是由简单的创建一个计数器的请求,然后使用 rate() 函数,这将给你在给定时间内的请求率。

然而,如果你出于某种原因想自己做这件事,那么你可以做一些类似于在以下函数中所做的事情。org.apache.kafka.common.metrics.stats.Rate. 因此,在这种情况下,你需要收集的样品列表,他们在收集的时间,以及窗口大小,你要使用的速率计算,然后你可以简单地进行计算,即删除样品的范围外,已经过期,然后简单地计算有多少样品在窗口。

然后你可以设置 Gauge 到计算值。

© www.soinside.com 2019 - 2024. All rights reserved.