我有一个DataFlow管道尝试构建索引(键值对)并计算一些指标(如每个键的多个值)。输入数据总共约为60 GB,存储在GCS上,管道中分配了大约126个工作人员。 Per Stackdriver所有工作者的CPU利用率约为6%。
尽管有126名工人,但管道似乎没有任何进展,并且根据墙上时间,瓶颈似乎是一个简单的计数步骤,跟随一个小组。虽然所有其他步骤平均花费不到1小时,但计数步骤已经花了50天的时间。日志中似乎没有任何有用的信息警告。
计数步骤是在WordCount示例中的相应步骤之后实现的:
def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1
self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)
return (key, count)
上一步“组关键字”是一个简单的beam.GroupByKey()转换。
请告知可能的原因以及如何优化。
Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB
在这里使用每个键的总和的最佳方法是使用组合操作。原因是它可以缓解热键的问题。
尝试用GroupByKey + ParDo
替换你的beam.combiners.Count.PerKey
,或类似你的用例组合变换。