这是一个组成的玩具示例,试图获得有关我的问题的更难部分的帮助。假设我有来自Kafka流的销售数据:
...
Period: 5, SalesPersonId: 78, Sale: TRUE, Timestamp: ...,
Period: 5, SalesPersonId: 43, Sale: FALSE, Timestamp: ...,
Period: 5, SalesPersonId: 33, Sale: TRUE, Timestamp: ...,
...
每行代表特定销售人员的销售机会(在特定时期内)。
以下是期间的工作方式:期间大约持续2-3周。但是,这些时期不在我的控制之下;它们在到达流时已被分配。在期间之间的过渡期间,我可能仍然会在最后一段时间内收到一两天的数据(例如,日本的销售地点可能仍然处于旧时期)。梁聊天的人建议我可以在这种情况下使用会话窗口,如果我只是在我的密钥中包含句点并且大约延长2天的间隔时间。似乎这样可行。
我很清楚如何做这样的事情:每个时期的销售机会总数,每个销售人员每个时期的平均销售率等。例如,调用以下查询产生的PCollection:
SELECT
period,
salesPersonId,
COUNT(*) as totalSalesOpportunities,
COUNT(*) FILTER(WHERE sale) as totalSales,
ROUND(COUNT(*) FILTER (WHERE SALE)/COUNT(*),2) as salesRate
FROM stream
GROUP BY period, salesPersonId
我的要求比这更复杂。假设我们公司有一个假设,即在一段时间内拥有更多销售机会的销售人员将获得更好的销售率。也许总的销售机会是动机的一种表现,或者额外的机会让更多的实践尝试出售那个时期出售的任何产品。所以,该公司想要这个统计数据:
在此期间(目前为止)销售机会的销售额为90%或更高的销售人员的总销售额是多少? 10%或更低的百分位数?即,
(TOTAL SALES MADE BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)/(TOTAL SALES OPPORTUNITIES BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)
当然,在一个时期的早期,第90百分位可能只有3个机会。但是,随着时间的推移,分布将分散,可能有40个机会。那么,如果这个统计数据更新,比如每小时,那也没关系。
据我所知,我需要做以下事情,称之为B:
Rekey A, apply ApproximateQuantiles, feed it back to filter A, reaggregate A.
但是,我不认为这可以逐步完成。那么我如何表达“逐步进行A,但每小时做一次B批量操作”?
或者,有没有更好的方法来处理梁的这种情况?
如果我正确理解您的问题,您需要对相同数据进行2种类型的聚合。
这里需要注意的一点是,你不能让A依赖于B广告B依赖于A,因为这会在你的管道图中创建一个循环。
您可以从包含原始输入流的PC1开始。
PC2:PC1 - >做A'(与A相同) - >做B
PC3:PC1 - >执行A,PC2作为侧输入。
您可以阅读更多关于侧输入here的信息