我正在收到有关不同主题的一系列Avro格式的事件。我想使用这些并以拼花形式写入s3。我写了下面的工作,为每个事件创建一个不同的流,并从合计架构注册表中获取其架构,以创建事件的拼花槽。这工作正常,但是我面临的唯一问题是,每当一个新事件开始时,我就必须更改YAML配置并每次都重新启动作业。有什么办法可以让我不必重新启动作业,它开始消耗新的事件集。
YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);
long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();
List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();
CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
try {
for (EventConfig eventConfig : eventTypesList) {
LOG.info("creating a stream for ", eventConfig.getEvent_name());
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
}
} catch (Exception e) {
e.printStackTrace();
}
Yaml文件:
!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"
- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"
- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"
- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"
checkPointInterval: 1200000
topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
谢谢。
[我认为您想使用自定义的BucketAssigner,它将genericRecord.get(EVENT_NAME).toString()
值用作存储区ID,以及EventTimeBucketAssigner
正在执行的所有事件时间存储区。
然后,您不需要创建多个流,它应该是动态的(每当写入的记录中出现新的事件名称值时,您都会获得一个新的输出接收器。)