如何加入流和数据集?我有一个流,并且文件中有一个静态数据。我想使用文件中的数据来丰富流的数据。
示例:在流中,我获得机场代码,在文件中,我在文件中包含机场名称和代码。现在,我想将流数据加入文件中以形成带有机场名称的新流。请提供有关如何实现此目标的步骤。
根据确切的要求,有很多方法可以使用Flink进行流富集。 https://www.youtube.com/watch?v=cJS18iKLUIY是Konstantin Knauf的精彩演讲,涵盖了许多不同的方法,以及它们之间的权衡。
在简单的情况下,浓缩数据是不变的并且相当小,我只需要使用RichFlatMap
并以open()
方法加载整个文件。看起来像这样:
public class EnrichmentWithPreloading extends RichFlatMapFunction<Event, EnrichedEvent> {
private Map<Long, SensorReferenceData> referenceData;
@Override
public void open(final Configuration parameters) throws Exception {
super.open(parameters);
referenceData = loadReferenceData();
}
@Override
public void flatMap(
final Event event,
final Collector<EnrichedEvent> collector) throws Exception {
SensorReferenceData sensorReferenceData =
referenceData.get(sensorMeasurement.getSensorId());
collector.collect(new EnrichedEvent(event, sensorReferenceData));
}
}
您会在https://github.com/knaufk/enrichments-with-flink中找到其他方法的更多代码示例。
更新:
如果您希望预先加载一些较大的,分区的参考数据以与流结合,则有几种方法可以解决此问题,其中一些方法已在我上面共享的视频和回购中介绍。对于那些特定的要求,我建议使用自定义分区程序。在同一个github存储库中有一个示例here。想法是将扩充数据分片,并且将每个流事件与相关参考数据一起导向实例。
我认为,这比尝试获取Table API来作为连接来进行这种特定的丰富化要简单。