使用案例:我有消息有messageId,多个消息可以有相同的消息ID,这些消息存在于由messageId分区的流管道(如kafka)中,所以我确保所有具有相同messageId的消息都将进入同一个分区。
所以我需要编写一个应该缓冲消息一段时间(比如说1分钟)的作业,然后将具有相同messageId的所有消息组合到单个大消息中。
我认为可以使用spark Datasets和spark sql(或其他东西?)来完成。但我找不到任何关于如何为给定的消息ID存储消息一段时间的示例/文档,然后对这些消息进行聚合。
我想你要找的是Spark Streaming。 Spark有一个Kafka Connector,可以链接到Spark Streaming Context。
这是一个非常基本的示例,它将在1分钟的间隔内为给定主题集中的所有消息创建RDD,然后通过消息ID字段对它们进行分组(当然,您的值序列化器必须公开这样的getMessageId
方法)。
SparkConf conf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.minutes(1));
Map<String, Object> params = new HashMap<String, Object>() {{
put("bootstrap.servers", kafkaServers);
put("key.deserializer", kafkaKeyDeserializer);
put("value.deserializer", kafkaValueDeserializer);
}};
List<String> topics = new ArrayList<String>() {{
// Add Topics
}};
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, params)
);
stream.foreachRDD(rdd -> rdd.groupBy(record -> record.value().getMessageId()));
ssc.start();
ssc.awaitTermination();
还有其他几种方法可以在流API中对消息进行分组。查看文档以获取更多示例。