我正在尝试使用apache_beam运行管道(最后将到达DataFlow)。
管道应如下所示:
[我格式化来自PubSub的数据,将原始结果写入Firestore,运行ML模型,在获得ML模型的结果后,我想使用从第一次写入FS时获得的ID更新Firestore。 >
管道代码通常如下所示:
with beam.Pipeline(options=options) as p: # read and format formated_msgs = ( p | "Read from PubSub" >> LoadPubSubData(known_args.topic) ) # write the raw results to firestore write_results = ( formated_msgs | "Write to FS" >> beam.ParDo(WriteToFS()) | "Key FS" >> beam.Map(lambda fs: (fs["record_uuid"], fs)) ) # Run the ML model ml_results = ( formated_msgs | "ML" >> ML() | "Key ML" >> beam.Map(lambda row: (row["record_uuid"], row)) ) # Merge by key and update - HERE IS THE PROBLEM ( (write_results, ml_results) # I want to have the data from both merged by the key at this point | "group" >> beam.CoGroupByKey() | "log" >> beam.ParDo(LogFn()) )
我尝试了很多方法,但似乎找不到正确的方法。有什么想法吗?
-更新1 ---
问题是在日志行上我什么都没得到。有时,我什至在操作上超时。可能需要注意的是,我一开始是从PubSub传输数据的。
我正在尝试使用apache_beam运行管道(最后将到达DataFlow)。管道应如下所示:我格式化来自PubSub的数据,将原始结果写入Firestore,然后运行...
好,所以我终于明白了。我想唯一丢失的是窗口化,因为我正在流传输数据。