使用Dataflow对象的GroupByKey?

问题描述 投票:0回答:1

我正在通过apache-beam编写一个简单的python管道来聚合用户投票。

在输入中,我有逗号分隔的行,如下所示:

pollA,answerB
pollA,answerC
pollB,answerA
pollB,answerB
pollC,answerE
pollA,answerB

接下来,我使用ParDo函数将每一行转换为如下对象:

输出:

{
  pollId: pollA,
  answerId: answerB,
  votes: 1
}

功能:

class Split(beam.DoFn):

    def process(self, element):
        pollId, answerId = element.split(",")

        return [{
            'pollId': pollId,
            'answerId': answerId,
            'votes': 1
        }]

现在,假设我有3个回答B,我想通过answerId将它们分组并计算它们输出类似的东西:

{
  pollId: pollA,
  answerId: answerB,
  votes: 3
}

我是python和apache-beam的新手,所以我很感激帮助:)

python google-cloud-dataflow apache-beam
1个回答
1
投票

一个答案是要意识到您的每条记录都可以描述为:

  • 关键:pollId + answerId
  • 价值:1 // The vote

如果你有一个PCollection是这个表单的键/值对,那么你可以对该集合执行一个CombinePerKey(sum),它会聚合所有具有相同键的项目,并将它们的值相加,为你提供一个由新Key / Value组成的新PCollection对,其值是所有记录的总和,具有相同的pollIdanswerId

有关此函数的用法,请参阅CombinePerKey Python文档。

© www.soinside.com 2019 - 2024. All rights reserved.