创建的临时文件是否存在于气流 DAG 中的跨任务中

问题描述 投票:0回答:1

airflow DAG 是否保留跨任务创建的临时文件。

我的代码是

//This task calls an external api and downloads data into download.csv, this temp file download.csv is created on the fly by task using open function in python
t1 = BashOperator(
        task_id=f"api_download",
        bash_command=f'api_download_lineitemstatus.py',
    )
//This task suppose to read download.csv created in the first task and insert into DB
    t2 = BashOperator(
        task_id=f"insert_into_DB",
        bash_command=f'python lineinsert.py',
    )
 t1 >> t2

我在任务 t2 中收到此错误 FileNotFoundError:[Errno 2]没有这样的文件或目录:'download.csv'

有什么建议吗?

python python-3.x airflow airflow-2.x
1个回答
3
投票

如果两个任务在同一环境中运行,它们应该存在。如果您在本地执行器上运行,您可以验证文件是否已下载到正确的路径,我建议在使用本地执行器检查文件是否存在之前测试您的代码。

如果在 kubernetes 执行器中运行

在 Kubernetes Executor 中,每个任务实例都在 Kubernetes 集群上自己的 pod 中运行。然后,worker pod 运行任务、报告结果并终止。因此,在工作 Pod 关闭后,Pod 内的所有内容都会丢失,除非您已对其进行配置。因此,所有下载的文件都会丢失。

另一种方法是创建一个任务,将您在任务 1 和任务 2 上执行的操作分组。(您将获得创建和删除 pod 的时间)

另一种选择是使用S3/GCS/等来存储文件并在后续任务中恢复它。

最终更换为 cellery 执行器或 CeleryKubernetes 也可能是一个解决方案。

https://airflow.apache.org/docs/apache-airflow/stable/executor/celery_kubernetes.html

© www.soinside.com 2019 - 2024. All rights reserved.