Flink动态更新流作业

问题描述 投票:0回答:1

我正在收到有关不同主题的一系列Avro格式的事件。我想使用这些并以拼花形式写入s3。我写了下面的工作,为每个事件创建一个不同的流,并从合计架构注册表中获取其架构,以创建事件的拼花槽。这工作正常,但是我面临的唯一问题是,每当一个新事件开始时,我就必须更改YAML配置并每次都重新启动作业。有什么办法可以让我不必重新启动作业,它开始消耗新的事件集。

YamlReader reader = new YamlReader(topologyConfig);
    EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

    long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
        topics = eventTopologyConfig.getTopics();

                List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

        CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
        new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
        properties);

        DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

        try {
        for (EventConfig eventConfig : eventTypesList) {

        LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
        (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
        .withBucketAssigner(new EventTimeBucketAssigner())
        .build();

        DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
        if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
        return true;
        }
        return false;
        });
        outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

        }
        } catch (Exception e) {
        e.printStackTrace();
        }

Yaml文件:

!com.bounce.config.EventTopologyConfig
eventsType:
  - !com.bounce.config.EventConfig
    event_name: "search_list_keyless"
    schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
    topic: "search_list_keyless"

  - !com.bounce.config.EventConfig
    event_name: "bike_search_details"
    schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
    topic: "bike_search_details"

  - !com.bounce.config.EventConfig
    event_name: "keyless_bike_lock"
    schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
    topic: "analytics-keyless"

  - !com.bounce.config.EventConfig
      event_name: "keyless_bike_unlock"
      schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
      topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]

谢谢。

java apache-flink flink-streaming
1个回答
0
投票

[我认为您想使用自定义的BucketAssigner,它将genericRecord.get(EVENT_NAME).toString()值用作存储区ID,以及EventTimeBucketAssigner正在执行的所有事件时间存储区。

然后,您不需要创建多个流,它应该是动态的(每当写入的记录中出现新的事件名称值时,您都会获得一个新的输出接收器。)

© www.soinside.com 2019 - 2024. All rights reserved.