我正在通过apache-beam编写一个简单的python管道来聚合用户投票。
在输入中,我有逗号分隔的行,如下所示:
pollA,answerB
pollA,answerC
pollB,answerA
pollB,answerB
pollC,answerE
pollA,answerB
接下来,我使用ParDo函数将每一行转换为如下对象:
输出:
{
pollId: pollA,
answerId: answerB,
votes: 1
}
功能:
class Split(beam.DoFn):
def process(self, element):
pollId, answerId = element.split(",")
return [{
'pollId': pollId,
'answerId': answerId,
'votes': 1
}]
现在,假设我有3个回答B,我想通过answerId将它们分组并计算它们输出类似的东西:
{
pollId: pollA,
answerId: answerB,
votes: 3
}
我是python和apache-beam的新手,所以我很感激帮助:)
一个答案是要意识到您的每条记录都可以描述为:
pollId + answerId
1 // The vote
如果你有一个PCollection是这个表单的键/值对,那么你可以对该集合执行一个CombinePerKey(sum)
,它会聚合所有具有相同键的项目,并将它们的值相加,为你提供一个由新Key / Value组成的新PCollection对,其值是所有记录的总和,具有相同的pollId
和answerId
。
有关此函数的用法,请参阅CombinePerKey Python文档。