我想要一个 DAG 来检查 GCS 上是否有 avro 文件,如果是则转到另一个任务。这个 avro 文件以不同的名称进入 gcs,所以我想使用前缀来获取文件夹中的任何 avro 文件。我使用这样的代码:
gcs_sensor = GCSObjectsWithPrefixExistenceSensor(
task_id='gcs_avro_file_sensor',
bucket='my_bucket',
prefix='mysql/abc/',
google_cloud_conn_id='google_cloud_default',
timeout=600,
poke_interval=60,
dag=dag
)
但它总是成功,因为它是 ExistenceSensor。我如何修改它以检查新文件是否已更新(旧文件始终被删除,因此此文件夹中始终只有 1 个 avro 文件)。
来自气流文档:
当找到与给定前缀匹配的文件时,poke 方法的 将会满足标准
这意味着任何带有此前缀的文件。
从你的问题来看:
(旧文件始终会被删除,因此此文件夹中始终只有 1 个 avro 文件)。
这意味着 ExistenceSensor 应始终返回 true。我建议您从 PrefixExistenceSensor 获取文件名后使用 GCSObjectUpdateSensor。