我有一个工作的Dataflow管道,第一次运行setup.py
来安装一些本地帮助器模块。我现在想使用Cloud Composer / Apache Airflow来安排管道。我创建了我的DAG文件,并将其与我的管道项目一起放在指定的Google Storage DAG文件夹中。文件夹结构如下所示:
{Composer-Bucket}/
dags/
--DAG.py
Pipeline-Project/
--Pipeline.py
--setup.py
Module1/
--__init__.py
Module2/
--__init__.py
Module3/
--__init__.py
我的DAG中指定setup.py文件的部分如下所示:
resumeparserop = dataflow_operator.DataFlowPythonOperator(
task_id="resumeparsertask",
py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
dataflow_default_options={
"project": {PROJECT-NAME},
"setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})
但是,当我查看Airflow Web UI中的日志时,我收到错误:
RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.
我不知道为什么它找不到安装文件。如何使用安装文件/模块运行Dataflow管道?
如果查看DataflowPythonOperator的代码,看起来主py_file可以是GCS存储桶内的文件,并在执行管道之前由运营商进行本地化。但是,我没有看到dataflow_default_options的类似内容。似乎只是复制和格式化选项。
由于GCS dag文件夹使用Cloud Storage Fuse安装在Airflow实例上,因此您应该能够使用“dags_folder”env var在本地访问该文件。即你可以做这样的事情:
from airflow import configuration
....
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'Pipeline-Project', 'setup.py')
然后,您可以将LOCAL_SETUP_FILE变量用于dataflow_default_options中的setup_file属性。
您是使用相同的服务帐户运行Composer和Dataflow,还是它们是分开的?在后一种情况下,您是否检查过Dataflow的服务帐户是否具有对存储桶和对象的读访问权限?