我有实时数据被发布并以同步方式读入数据流管道。
我收集数据,对其进行窗口化(固定为 1 秒)并进行累积,然后将其写入 Firestore DB。 该数据库正在由前端监视,当新数据到达时,前端会自动提取快照。
我看到的行为是前端数据到达不同步,数据到达之间的延迟不一致。
我一直在研究有状态和及时的处理,我认为它可以解决我的问题,但我不知道如何实现它,因为我没有批量处理任何数据,据我所知,这意味着我应该使用及时的处理
DoFn
。
class WriteToFirestore(beam.DoFn):
EXPIRY_TIMER = userstate.TimerSpec('expiry', userstate.TimeDomain.WATERMARK)
def setup(self):
from google.cloud import firestore
self.firestore_client = firestore.Client(project='<project id>')
def process(self, element,
w=beam.DoFn.WindowParam,
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
expiry_timer.set(w.end + 0.5) # wait 0.5 seconds before executing
@userstate.on_timer(EXPIRY_TIMER)
def expiry(self, element):
collection_ref = self.firestore_client.collection('<collection id>')
collection_ref.add(element)
这是当前代码,它会抛出编码器数量不匹配的问题。
我做错了什么?
对数据进行窗口化并将其分组后,您应该使用窗口时间作为“时间戳”。将
beam.DoFn.TimerParam
作为参数传递也是个好主意。构造函数采用计时器名称标签和设置的持续时间(您可以使用 datetime.timedelta
来设置)。此代码未经测试,但应该可以工作:
class WriteToFirebase(beam.DoFn):
def setup(self):
from google.cloud import firestore
self.firestore_client = firestore.Client(project='<project id>')
def process(self, element, timer=beam.DoFn.TimerParam, window=beam.DoFn.WindowParam):
# Get the end timestamp of the current window.
window_end = window.end
# Calculate the time 1 second after the window end.
event_time = window_end + timedelta(seconds=1)
# Set a timer to trigger at the calculated event time.
timer.set("my_timer", event_time)
def on_timer(self, element, timer=beam.DoFn.TimerParam):
collection_ref = self.firestore_client.collection('fused_detections')
collection_ref.add(element)```