如何使用Cloud Composer / Apache Airflow运行带有安装文件的Dataflow管道?

问题描述 投票:0回答:2

我有一个工作的Dataflow管道,第一次运行setup.py来安装一些本地帮助器模块。我现在想使用Cloud Composer / Apache Airflow来安排管道。我创建了我的DAG文件,并将其与我的管道项目一起放在指定的Google Storage DAG文件夹中。文件夹结构如下所示:

{Composer-Bucket}/
    dags/
       --DAG.py
       Pipeline-Project/
           --Pipeline.py
           --setup.py
           Module1/
              --__init__.py
           Module2/
              --__init__.py
           Module3/
              --__init__.py

我的DAG中指定setup.py文件的部分如下所示:

resumeparserop = dataflow_operator.DataFlowPythonOperator(
    task_id="resumeparsertask",
    py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
    dataflow_default_options={
        "project": {PROJECT-NAME},    
        "setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})

但是,当我查看Airflow Web UI中的日志时,我收到错误:

RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.

我不知道为什么它找不到安装文件。如何使用安装文件/模块运行Dataflow管道?

python-2.7 google-cloud-dataflow airflow google-cloud-composer
2个回答
2
投票

如果查看DataflowPythonOperator的代码,看起来主py_file可以是GCS存储桶内的文件,并在执行管道之前由运营商进行本地化。但是,我没有看到dataflow_default_options的类似内容。似乎只是复制和格式化选项。

由于GCS dag文件夹使用Cloud Storage Fuse安装在Airflow实例上,因此您应该能够使用“dags_folder”env var在本地访问该文件。即你可以做这样的事情:

from airflow import configuration
....
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'Pipeline-Project', 'setup.py')

然后,您可以将LOCAL_SETUP_FILE变量用于dataflow_default_options中的setup_file属性。


0
投票

您是使用相同的服务帐户运行Composer和Dataflow,还是它们是分开的?在后一种情况下,您是否检查过Dataflow的服务帐户是否具有对存储桶和对象的读访问权限?

© www.soinside.com 2019 - 2024. All rights reserved.