为什么 Apache Beam 似乎在单个 worker 上并行化元素?

问题描述 投票:0回答:1

我正在使用 Google Cloud Dataflow 运行器测试一个非常简单的 Apache Beam 管道,该运行器从 Pub/Sub 读取音频元素,通过 Tensorflow 模型运行元素,并将结果写入 Pub/Sub。在我的测试中,我禁用了自动缩放并将工作人员数量设置为 1。我还没有弄清楚如何让 Cloud Profiler 工作,所以我在几个地方添加了日志记录以生成 Cloud Profiler 各个部分的“配置文件”我的管道,包括主要的

DoFn
处理音频,像这样:

class ProcessAudio(beam.DoFn):
    def setup(self):
        ... # do some setup stuff

    def process(self, element):
        chunk_num = element.attrs['chunk_num']
        logging.info(f'chunk_{chunk_num} processing')
        
        prediction = ... # do processing

        logging.info(f'chunk_{chunk_num} processing_completed')
        return [prediction]


# here's the actual pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
        pipeline = (
            pipeline
            | 'ReadChunks' >>
            beam.io.ReadFromPubSub(
                subscription=read_subscription_name,
                with_attributes=True,
            )
            | 'DecodeChunks' >> beam.Map(decode_chunk)
            | 'RunModel' >> beam.ParDo(ProcessAudio())
            | 'EncodeChunks' >> beam.Map(encode_chunk)
            | beam.io.WriteToPubSub(write_topic_name, with_attributes=True)
        )

使用这些日志,我创建了我的配置文件,这是我拥有此管道过程 30 个元素时得到的结果:

红色部分代表每个块(元素)在主

DoFn
中处理的时间。除非我做了一些棘手的事情,否则这会让所有元素看起来都由我提供的单个工作人员同时处理,这应该只是一个只有两个线程的 n1-standard-2 机器。为什么 Beam 会以这种方式仅在两个线程上并行运行所有这些元素?我原以为它最多同时处理 2 个元素,我觉得它处理事物的方式会导致大量上下文切换,并且可能是使用它拥有的 2 个 vCPU 的低效方式。有什么办法可以鼓励它不要这样做吗?或者出于某种原因,这实际上是有效的/可以让它这样做吗?我只是想了解如何使它尽可能高效/快速。

google-cloud-dataflow apache-beam
1个回答
0
投票

好吧,我想通了。

根据数据流的--number_of_worker_harness_threads选项的

docs
,默认情况下“......数据流服务确定每个工作线程的适当数量”。

所以我做的一件事就是设置这个选项:

--number_of_worker_harness_threads=2
。我不是 100% 确定工人 harness 是什么意思,如果一个线束等于一个工人,但是将其设置为 2 会显着降低每个工人的并行度,实际上会略微提高我的管道的整体速度,这就是我希望会发生。

当我重新处理我的配置文件时,我仍然注意到 Dataflow 在一个 worker 上并行处理 4 个元素。拼图的最后一块是

--experiments=no_use_multiple_sdk_containers
选项。似乎 Dataflow 还将在一台工作计算机上运行 sdk 的多个实例,除非您告诉它不要这样做。当我设置这个选项时,我得到了以下配置文件:

这更符合我的预期。然而,正如文档警告的那样,我 am 看到我可能退缩得太远了,现在管道只使用了大约 70% 的工作 CPU。可能现在被 python 的 GIL 瓶颈了,但至少我知道该用什么旋钮来修复它。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.