我正在尝试使用
ThreadPoolExecutor
同时运行三个不同的类。但数据流作业似乎没有同时运行,并且它正在一个接一个地运行每个类。而且,如果我在 max_worker
内部使用 ThreadPoolExecutor
,一旦数据流开始运行,它仍然会缩减为一名工作人员。我无法弄清楚如何修改代码以便它同时运行所有类。我在使用控制台运行代码时也使用了autoscaling_algorithm=NONE
。但它也显示出同样未被认可的论据。
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import concurrent.futures
class StudentId(beam.DoFn):
def process(self, element):
# code block for generating unique ids
pass
class ReadExcel(beam.DoFn):
def process(self, element, file_path):
# code block for reading Excel file
pass
class DqcP(beam.DoFn):
def process(self, element, bq_project, sid):
# code block for dqc_p function
pass
class DqcR(beam.DoFn):
def process(self, element, bq_project, sid):
# code block for dqc_r function
pass
def run_pipeline(pipeline):
return pipeline.run()
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--landing', required=True, type=str)
parser.add_argument('--BQ_project', required=True, type=str)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
# Generate unique ids
sid = (pipeline | 'sid' >> beam.Create([None]) | 'sid_gen' >> beam.ParDo(StudentId()))
# Read reference Excel file
ref_excel = (pipeline | 'start reading' >> beam.Create([None])
| 'read ref excel' >> beam.ParDo(ReadExcel(), known_args.landing))
# Run DQC_P and DQC_R concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit DQC_P pipeline
dqc_p_future = executor.submit(run_pipeline, beam.Pipeline(options=pipeline_options))
dqc_p_pipeline = dqc_p_future.result()
dqc_p_result = (ref_excel | 'dqc_p' >> beam.ParDo(DqcP(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))
# Submit DQC_R pipeline
dqc_r_future = executor.submit(run_pipeline, beam.Pipeline(options=pipeline_options))
dqc_r_pipeline = dqc_r_future.result()
dqc_r_result = (ref_excel | 'dqc_r' >> beam.ParDo(DqcR(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))
# Wait for DQC_P and DQC_R pipelines to finish
dqc_p_result.pipeline.run()
dqc_r_result.pipeline.run()
if __name__ == '__main__':
run()
您的代码将无法工作。我不知道你为什么需要
concurrent.futures.ThreadPoolExecutor()
。您的代码应该定义整个管道,然后将其提交到 Dataflow,该数据流将由许多工作人员执行。
您的代码应该更改为如下所示:
with beam.Pipeline(options=pipeline_options) as pipeline:
# Generate unique ids
sid = (pipeline | 'sid' >> beam.Create([None]) | 'sid_gen' >> beam.ParDo(StudentId()))
# Read reference Excel file
ref_excel = (pipeline | 'start reading' >> beam.Create([None])
| 'read ref excel' >> beam.ParDo(ReadExcel(), known_args.landing))
ref_excel | 'dqc_p' >> beam.ParDo(DqcP(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))
ref_excel | 'dqc_r' >> beam.ParDo(DqcR(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))