TL; DR:当前最佳的用于保证Flink中事件的事件时间顺序的解决方案是什么?
我将Flink 1.8.0与Kafka 2.2.1结合使用。我需要通过事件时间戳来保证事件的正确顺序。我每1秒产生一次定期水印。我将FlinkKafkaConsumer与AscendingTimestampExtractor一起使用:
val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
override def extractAscendingTimestamp(element: T): Long =
timestampExtractor(element)
})
.addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)
然后处理:
myStream
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)
我意识到,对于在同一毫秒或几毫秒之后出现的无序事件,Flink不会更正顺序。我在文档中找到的内容:
水印触发最大时间戳(即结束时间戳-1)小于新水印的所有窗口的计算
所以我准备了额外的处理步骤来保证事件时间顺序:
myStream
.timeWindowAll(Time.milliseconds(100))
.apply((window, input, out: Collector[MyEvent]) => input
.toList.sortBy(_.getTimestamp)
.foreach(out.collect) // this windowing guarantee correct order by event time
)(TypeInformation.of(classOf[MyEvent]))
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)
但是,我发现此解决方案很难看,看起来像是一种解决方法。我也担心per-partition watermarks of KafkaSource
理想情况下,我想将顺序保证放到KafkaSource中,并为每个kafka分区保留它,就像每个分区的水印一样。有可能这样做吗? 保证Flink中事件的事件时间顺序的当前最佳解决方案是什么?
Flink不保证按事件时间顺序处理记录。分区中的记录将按其原始顺序进行处理,但是当两个或多个分区合并到新分区中(由于重新分区或流的并集)时,Flink会将那些分区的记录随机合并到新分区中。其他所有东西都会效率低下,并导致更高的延迟。
例如,如果您的工作有一个从两个Kafka分区读取的源任务,则两个分区的记录将以某种随机的锯齿形模式合并。
但是,Flink保证针对生成的水印正确处理所有事件。这意味着水印永远不会超过记录。例如,如果您的Kafka源生成每个分区的水印,则即使合并了多个分区的记录,水印仍然有效。水印用于收集和处理时间戳小于水印的所有记录。因此,它可以确保输入数据的完整性。
这是按其时间戳排序记录的先决条件。您可以使用全部滚动窗口来完成此操作。但是,您应该知道
KeyedProcessFunction
,这会更有效率。这很重要。在KafkaSource中保证顺序实际上包括两部分。
https://issues.apache.org/jira/browse/FLINK-12675中的第一部分已经在进行中。第二部分需要在子任务之间共享状态的支持,这可能需要在社区中进行更多的讨论和详细的计划。
回到您的问题,我认为通过设置一个缓冲数据的窗口来保持事件的顺序是目前的最佳解决方案。