如何加入流和数据集?

问题描述 投票:0回答:1

如何加入流和数据集?我有一个流,并且文件中有一个静态数据。我想使用文件中的数据来丰富流的数据。

示例:在流中,我获得机场代码,在文件中,我在文件中包含机场名称和代码。现在,我想将流数据加入文件中以形成带有机场名称的新流。请提供有关如何实现此目标的步骤。

apache-flink flink-streaming
1个回答
0
投票

根据确切的要求,有很多方法可以使用Flink进行流富集。 https://www.youtube.com/watch?v=cJS18iKLUIY是Konstantin Knauf的精彩演讲,涵盖了许多不同的方法,以及它们之间的权衡。

在简单的情况下,浓缩数据是不变的并且相当小,我只需要使用RichFlatMap并以open()方法加载整个文件。看起来像这样:

public class EnrichmentWithPreloading extends RichFlatMapFunction<Event, EnrichedEvent> {

    private Map<Long, SensorReferenceData> referenceData;

    @Override
    public void open(final Configuration parameters) throws Exception {
      super.open(parameters);
      referenceData = loadReferenceData();
    }

    @Override
    public void flatMap(
        final Event event,
        final Collector<EnrichedEvent> collector) throws Exception {

      SensorReferenceData sensorReferenceData = 
        referenceData.get(sensorMeasurement.getSensorId());
      collector.collect(new EnrichedEvent(event, sensorReferenceData));
    }

}

您会在https://github.com/knaufk/enrichments-with-flink中找到其他方法的更多代码示例。

更新:

如果您希望预先加载一些较大的,分区的参考数据以与流结合,则有几种方法可以解决此问题,其中一些方法已在我上面共享的视频和回购中介绍。对于那些特定的要求,我建议使用自定义分区程序。在同一个github存储库中有一个示例here。想法是将扩充数据分片,并且将每个流事件与相关参考数据一起导向实例。

我认为,这比尝试获取Table API来作为连接来进行这种特定的丰富化要简单。

© www.soinside.com 2019 - 2024. All rights reserved.