气流传感器检查 GCS 对象是否已更新

问题描述 投票:0回答:1

我想要一个 DAG 来检查 GCS 上是否有 avro 文件,如果是则转到另一个任务。这个 avro 文件以不同的名称进入 gcs,所以我想使用前缀来获取文件夹中的任何 avro 文件。我使用这样的代码:

    gcs_sensor = GCSObjectsWithPrefixExistenceSensor(  
        task_id='gcs_avro_file_sensor',  
        bucket='my_bucket',  
        prefix='mysql/abc/',   
        google_cloud_conn_id='google_cloud_default',  
        timeout=600,  
        poke_interval=60,  
        dag=dag  
    )  

但它总是成功,因为它是 ExistenceSensor。我如何修改它以检查新文件是否已更新(旧文件始终被删除,因此此文件夹中始终只有 1 个 avro 文件)。

python google-cloud-platform google-cloud-storage airflow
1个回答
0
投票

来自气流文档:

当找到与给定前缀匹配的文件时,poke 方法的 将会满足标准

这意味着任何带有此前缀的文件。

从你的问题来看:

(旧文件始终会被删除,因此此文件夹中始终只有 1 个 avro 文件)。

这意味着 ExistenceSensor 应始终返回 true。我建议您从 PrefixExistenceSensor 获取文件名后使用 GCSObjectUpdateSensor。

© www.soinside.com 2019 - 2024. All rights reserved.