按键分组后的简单计数步骤在DataFlow管道中非常慢

问题描述 投票:1回答:1

我有一个DataFlow管道尝试构建索引(键值对)并计算一些指标(如每个键的多个值)。输入数据总共约为60 GB,存储在GCS上,管道中分配了大约126个工作人员。 Per Stackdriver所有工作者的CPU利用率约为6%。

尽管有126名工人,但管道似乎没有任何进展,并且根据墙上时间,瓶颈似乎是一个简单的计数步骤,跟随一个小组。虽然所有其他步骤平均花费不到1小时,但计数步骤已经花了50天的时间。日志中似乎没有任何有用的信息警告。

计数步骤是在WordCount示例中的相应步骤之后实现的:

def count_keywords_per_product(self, key_and_group):
    key, group = key_and_group
    count = 0
    for e in group:
        count += 1

    self.stats.product_counter.inc()
    self.stats.keywords_per_product_dist.update(count)

    return (key, count)

上一步“组关键字”是一个简单的beam.GroupByKey()转换。

请告知可能的原因以及如何优化。

Current resource metrics:
Current vCPUs    126
Total vCPU time      1,753.649 vCPU hr
Current memory   472.5 GB
Total memory time    6,576.186 GB hr
Current PD   3.08 TB
Total PD time    43,841.241 GB hr
Current SSD PD   0 B
Total SSD PD time    0 GB hr
Total Shuffle data processed     1.03 TB
Billable Shuffle data processed      529.1 GB

包括计数在内的管道步骤如下所示:enter image description here

google-cloud-dataflow apache-beam
1个回答
1
投票

在这里使用每个键的总和的最佳方法是使用组合操作。原因是它可以缓解热键的问题。

尝试用GroupByKey + ParDo替换你的beam.combiners.Count.PerKey,或类似你的用例组合变换。

© www.soinside.com 2019 - 2024. All rights reserved.