获取FileSensor定位的文件路径

问题描述 投票:0回答:1

我正在构建一个等待文件系统更改的 DAG,然后对新出现或修改的文件运行一些分析,对此我使用

FileSensor
。我正在监视的路径定义包含 jinja 模板和通配符或 glob。当找到文件时,我想向后续回调和任务提供其绝对路径。然后,文件的元数据将与某些数据结构进行比较,以确定是否需要处理。

问题:如何从传感器“渗透”找到的文件的路径?我查看了FileSensor

源代码,但它只记录找到的文件,而不将路径存储在任何地方。有没有办法使用路径模板和/或上下文来重建路径而不进行额外的文件系统查询?

我想到了几种解决方法,但在我走任何一条路之前,我想确保有一个充分的理由。我的想法是:

  1. 将数据路径模板按原样传递给后续任务,并希望它在
    PythonOperator
    中使用时能够自动工作。
  2. 通过其 context/environment 或使用
    BashOperator
    + echo
    强制渲染 jinja 模板,然后重新查询文件系统。

这是我的配置的简化概要:

# <DAG initialization code>
    ...
    path_template: str = os.path.join(
        "/basepath/",
        "{{ data_interval_start.format('YYYYMMDD') }}",
        f"{source}.{{{{ data_interval_start.format('YYYYMMDD') }}}}*.csv.gz"
    )
    fs: FileSensor = FileSensor(
        task_id=f"{source}_data_sensor",
        filepath=path_template,
        poke_interval=int(duration(minutes=5).total_seconds()),
        timeout=int(duration(hours=1, minutes=30).total_seconds()),
        mode="reschedule",
        pre_execute=log_execution,
        on_success_callback=partial(log_found_file, path_template),
    )
    fs >> convert(source) >> analyze_data(source)

其中

log_found_file
如下所示:

def log_found_file(data_path_template: str, ctx: Context) -> None:
    """Logs the discovery of a data file."""
    data_path = f(data_path_template, ctx)  # <<<<<<<<<<<<<<< Need help with this
    stats: os.stat_result = os.stat(data_path)
    logger.success(
        f"Detected data file {data_path} "
        f"of size {stats.st_size}; "
        f"created on {stats.st_ctime}; "
        f"and last modified on {stats.st_mtime}."
    )

我正在使用 Airflow 2.8.0,以防万一。

python airflow parameter-passing glob file-watcher
1个回答
0
投票

下面是问题中提到的解决方法之一的实现:

def log_found_file(data_path_template: str, ctx: Context) -> None:
    """Logs the discovery of a data file."""
    glob_str: str = ctx.get("task").render_template(data_path_template, ctx)
    data_path: str = glob(glob_str)[0]
    ...
© www.soinside.com 2019 - 2024. All rights reserved.