我正在尝试使用Apache Beam编写一个简单的管道。假设我正在接受看起来像这样的用户请求:
(country, user_id, score, timestamp)
我只想汇总每个国家/地区所有用户的总得分,每分钟和10分钟的累计得分。但是,我有一个警告,我想对每个用户,每个存储区获取最新分数。意思是,如果我有两条记录:
('USA', 1, 10, 62)
('USA', 1, 4, 64)
并且假设它们映射到同一分钟存储桶,我想保留第二个记录(后一个记录的分数为4)。
我如何有效地做到这一点?现在,我将用户事件流传送到两个单独的分支中,一个分支每分钟计算一次此聚合,另一个分支每10分钟计算一次。显然,许多计算在此加倍。理想情况下,我们将能够重新使用每1分钟窗口的计算结果,以总计10分钟窗口的时间,但是我不太清楚该怎么做。
谢谢!
将元素通过管道传递到两个不同的分支可能没什么大不了的,但是是的,您可以通过避免重复聚合的方式来做到这一点。
假设您的10分钟和1分钟窗口可以彼此平均转换(固定时间的Windows应该可以正常工作,您可以执行以下操作:
Assign 1 min. windows -> Aggregate -> Assign 10 min. windows -> Aggregate
第一次聚合后(可能是某种类型的合并),结果元素应该具有来自合并元素的最新时间戳(可以通过更改TimestampCombiner进行修改)。这意味着,当您从一个窗口转换到另一个窗口时,只要窗口均匀排列,第二个聚合应该聚合所有与原始方法相同的数据。
对于您问题的第二部分,要保留窗口中带有最新时间戳的元素并放下其他元素,您将需要实现一个自定义CombineFn,该元素保留最新的元素。现在,为了实际从CombineFn中读取元素的时间戳,您首先需要使用Reify.timestamps将时间戳附加到元素。而且,您可能希望CombineFn输出没有时间戳的原始元素类型。因此总的来说,它看起来像这样(方括号内的PCollections,以便您可以看到类型):
[ElementT] -> Reify.timestamps -> [TimestampedValue<ElementT>] -> Combine -> [ElementT]