我有以下形式的Booking元素流:
Booking(id=B1, driverId=D1, time=t1, location=l1)
Booking(id=B2, driverId=D2, time=t2, location=l2)
我需要按位置查找最近15分钟内的预订数量。但是,应该针对该位置的任何新预订评估窗口。
大致像:
Assuming `time` field is set as timestamp of record
bookingStream.keyBy(b=>b.location).window(Any window of 15mins).trigger(triggerFunction)
除了trigger function should not be evaluated
在15分钟结束时,而是whenever any booking arrives at a location
和emit the count of booking in last 15min from timestamp of newly arrived booking
。
方法:
使用RichMap函数,将位置预订的优先级队列维护为托管状态(ValueState),并将时间戳记作为预订的优先级。对于到达的每个元素,首先将其添加到状态,并从当前到达的元素中删除15分钟之前的元素。将优先级队列中剩余元素的数量发送给收集器。
这是正确的方法,还是可以通过使用其他更好的flink构造来实现。
如果您在基于堆的状态后端上运行,那么您建议的行为应表现得相当好。但是对于RocksDB,您将不得不为每次访问都对优先级队列进行序列化/反序列化,这可能会很痛苦。
在RocksDB上可能会更好地执行的一种方法是将当前计数与ValueState中的最早时间戳记以及ListState中的预订集保持在一起。 RocksDB状态后端可以不通过ser / de附加到ListState,因此,当最早的元素太旧时,您只需要反序列化和重新序列化整个列表。