我是 Airflow 新手,为了测试我的知识,我尝试编写一个简单的文件并测试它在 Airflow 中的存在。这是代码片段...
def _downloading_data(**kwargs):
f = open('my_file.txt', 'w')
f.write('my_data')
f.close()
with DAG(dag_id='simple_dag', default_args=default_args, start_date=datetime(2021, 1, 1), schedule_interval=None, catchup=False) as dag:
downloading_data = PythonOperator(
task_id='downloading_data',
python_callable=_downloading_data,
)
waiting_for_data = FileSensor(
task_id='waiting_for_data',
filepath = 'my_file.txt',
fs_conn_id='fs_default',
)
在这种情况下,传感器永远不会触发。我只能假设这是因为 Docker 正在将文件写入到我不知道的地方。我已经使用管理控制台设置了连接,所以我认为至少没问题。
我尝试将 Python 函数中的文件路径参数和文件名更改为绝对路径(例如:c: est\my_file.txt),但这导致错误“没有这样的文件”。