我正在构建一个等待文件系统更改的 DAG,然后对新出现或修改的文件运行一些分析,对此我使用
FileSensor
。我正在监视的路径定义包含 jinja 模板和通配符或 glob。当找到文件时,我想向后续回调和任务提供其绝对路径。然后,文件的元数据将与某些数据结构进行比较,以确定是否需要处理。
问题:如何从传感器“渗透”找到的文件的路径?我查看了FileSensor
我想到了几种解决方法,但在我走任何一条路之前,我想确保有一个充分的理由。我的想法是:
PythonOperator
中使用时能够自动工作。BashOperator
+ echo 强制渲染 jinja 模板,然后重新查询文件系统。这是我的配置的简化概要:
# <DAG initialization code>
...
path_template: str = os.path.join(
"/basepath/",
"{{ data_interval_start.format('YYYYMMDD') }}",
f"{source}.{{{{ data_interval_start.format('YYYYMMDD') }}}}*.csv.gz"
)
fs: FileSensor = FileSensor(
task_id=f"{source}_data_sensor",
filepath=path_template,
poke_interval=int(duration(minutes=5).total_seconds()),
timeout=int(duration(hours=1, minutes=30).total_seconds()),
mode="reschedule",
pre_execute=log_execution,
on_success_callback=partial(log_found_file, path_template),
)
fs >> convert(source) >> analyze_data(source)
其中
log_found_file
如下所示:
def log_found_file(data_path_template: str, ctx: Context) -> None:
"""Logs the discovery of a data file."""
data_path = f(data_path_template, ctx) # <<<<<<<<<<<<<<< Need help with this
stats: os.stat_result = os.stat(data_path)
logger.success(
f"Detected data file {data_path} "
f"of size {stats.st_size}; "
f"created on {stats.st_ctime}; "
f"and last modified on {stats.st_mtime}."
)
我正在使用 Airflow 2.8.0,以防万一。
下面是问题中提到的解决方法之一的实现:
def log_found_file(data_path_template: str, ctx: Context) -> None:
"""Logs the discovery of a data file."""
glob_str: str = ctx.get("task").render_template(data_path_template, ctx)
data_path: str = glob(glob_str)[0]
...