我正在考虑一种方法,当用户将任何文件上传到 DAG 目录时触发 Airflow 任务,在这种情况下哪个 Airflow 操作员可以提供帮助?
我了解 FileSensor 运算符,但据我了解,它期望将特定文件上传到目录。
如果 Airflow 没有这样的运算符,在这种情况下我应该尝试什么?
您可以为上传的文件创建一个文件夹(例如“user_upload/”)。之后通过os.path.getmtime或os.path.getctime获取该文件夹下的所有文件以及文件的修改或创建时间。之后,您可以将这些时间与模板变量(如data_interval_start或ts)进行比较并获取新文件。