airflow DAG 是否保留跨任务创建的临时文件。
我的代码是
//This task calls an external api and downloads data into download.csv, this temp file download.csv is created on the fly by task using open function in python
t1 = BashOperator(
task_id=f"api_download",
bash_command=f'api_download_lineitemstatus.py',
)
//This task suppose to read download.csv created in the first task and insert into DB
t2 = BashOperator(
task_id=f"insert_into_DB",
bash_command=f'python lineinsert.py',
)
t1 >> t2
我在任务 t2 中收到此错误 FileNotFoundError:[Errno 2]没有这样的文件或目录:'download.csv'
有什么建议吗?
如果两个任务在同一环境中运行,它们应该存在。如果您在本地执行器上运行,您可以验证文件是否已下载到正确的路径,我建议在使用本地执行器检查文件是否存在之前测试您的代码。
如果在 kubernetes 执行器中运行:
在 Kubernetes Executor 中,每个任务实例都在 Kubernetes 集群上自己的 pod 中运行。然后,worker pod 运行任务、报告结果并终止。因此,在工作 Pod 关闭后,Pod 内的所有内容都会丢失,除非您已对其进行配置。因此,所有下载的文件都会丢失。
另一种方法是创建一个任务,将您在任务 1 和任务 2 上执行的操作分组。(您将获得创建和删除 pod 的时间)
另一种选择是使用S3/GCS/等来存储文件并在后续任务中恢复它。
最终更换为 cellery 执行器或 CeleryKubernetes 也可能是一个解决方案。
https://airflow.apache.org/docs/apache-airflow/stable/executor/celery_kubernetes.html