我正在使用 Google Cloud Dataflow 运行器测试一个非常简单的 Apache Beam 管道,该运行器从 Pub/Sub 读取音频元素,通过 Tensorflow 模型运行元素,并将结果写入 Pub/Sub。在我的测试中,我禁用了自动缩放并将工作人员数量设置为 1。我还没有弄清楚如何让 Cloud Profiler 工作,所以我在几个地方添加了日志记录以生成 Cloud Profiler 各个部分的“配置文件”我的管道,包括主要的
DoFn
处理音频,像这样:
class ProcessAudio(beam.DoFn):
def setup(self):
... # do some setup stuff
def process(self, element):
chunk_num = element.attrs['chunk_num']
logging.info(f'chunk_{chunk_num} processing')
prediction = ... # do processing
logging.info(f'chunk_{chunk_num} processing_completed')
return [prediction]
# here's the actual pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
pipeline = (
pipeline
| 'ReadChunks' >>
beam.io.ReadFromPubSub(
subscription=read_subscription_name,
with_attributes=True,
)
| 'DecodeChunks' >> beam.Map(decode_chunk)
| 'RunModel' >> beam.ParDo(ProcessAudio())
| 'EncodeChunks' >> beam.Map(encode_chunk)
| beam.io.WriteToPubSub(write_topic_name, with_attributes=True)
)
使用这些日志,我创建了我的配置文件,这是我拥有此管道过程 30 个元素时得到的结果:
红色部分代表每个块(元素)在主
DoFn
中处理的时间。除非我做了一些棘手的事情,否则这会让所有元素看起来都由我提供的单个工作人员同时处理,这应该只是一个只有两个线程的 n1-standard-2 机器。为什么 Beam 会以这种方式仅在两个线程上并行运行所有这些元素?我原以为它最多同时处理 2 个元素,我觉得它处理事物的方式会导致大量上下文切换,并且可能是使用它拥有的 2 个 vCPU 的低效方式。有什么办法可以鼓励它不要这样做吗?或者出于某种原因,这实际上是有效的/可以让它这样做吗?我只是想了解如何使它尽可能高效/快速。
好吧,我想通了。
根据数据流的--number_of_worker_harness_threads
选项的
docs,默认情况下“......数据流服务确定每个工作线程的适当数量”。
所以我做的一件事就是设置这个选项:
--number_of_worker_harness_threads=2
。我不是 100% 确定工人 harness 是什么意思,如果一个线束等于一个工人,但是将其设置为 2 会显着降低每个工人的并行度,实际上会略微提高我的管道的整体速度,这就是我希望会发生。
当我重新处理我的配置文件时,我仍然注意到 Dataflow 在一个 worker 上并行处理 4 个元素。拼图的最后一块是
--experiments=no_use_multiple_sdk_containers
选项。似乎 Dataflow 还将在一台工作计算机上运行 sdk 的多个实例,除非您告诉它不要这样做。当我设置这个选项时,我得到了以下配置文件:
这更符合我的预期。然而,正如文档警告的那样,我 am 看到我可能退缩得太远了,现在管道只使用了大约 70% 的工作 CPU。可能现在被 python 的 GIL 瓶颈了,但至少我知道该用什么旋钮来修复它。