如何在 Airflow 中设置 SFTPSensor 以对服务器上出现的任何文件做出反应?

问题描述 投票:0回答:2

我对 Airflow 还很陌生。我正在尝试设置 SFTPSensor 以查看 SFTP 服务器上的文件夹中是否出现任何文件。对我来说,这听起来像是 file_pattern 属性中的正则表达式“*”:

import airflow
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.sensors.python import PythonSensor
from datetime import datetime, timedelta

args = {
    "owner": "My_company",
    "start_date": datetime(2022,10,17)}

def get_list_of_files():
    ftp_hook = SFTPHook(ftp_conn_id="My_company")
    files_list = ftp_hook.list_directory("/in/")
    logging.info("The list of files is the following:")
    logging.info(files_list)
    return files_list

dag = DAG(
    dag_id = "Checking_SFTP_Server_with_sensor",
    default_args=args,
    schedule_interval="0 8 * * *",
    dagrun_timeout=timedelta(minutes=1),
    tags=['My_company'])    

check_SFTP = SFTPSensor(task_id="check_SFTP", 
                        sftp_conn_id="My_company",
                        path="/in/",
                        file_pattern="*",
                        poke_interval=15, 
                        timeout=60*5,
                        dag=dag 
                        )

start =  DummyOperator(task_id='start', dag = dag)

def createOrderProcessingTask(file):
    return TriggerDagRunOperator(
                                task_id = f'process_order_{file}', 
                                trigger_dag_id = "Processing_the_order",
                                conf = {"file_name": file},
                                dag = dag
                                )

end = DummyOperator(task_id='end', dag = dag)

files = get_list_of_files()

check_SFTP >> start

for file in files:
    task = createOrderProcessingTask(file)
    start >> task >> end

但我无法处理该属性“file_pattern”。上面的 DAG 因错误而中断:

Broken DAG: [/opt/airflow/dags/repo/dags/check_sftp_server_with_sensor.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 390, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 744, in __init__
    f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
airflow.exceptions.AirflowException: Invalid arguments were passed to SFTPSensor (task_id: check_SFTP). Invalid arguments were:
**kwargs: {'file_pattern': '*'}

我错过了什么?我应该使用不同的方法来解决这个问题吗?

python airflow filepattern
2个回答
0
投票

您可能混淆了关键字参数的顺序

看看签名

SFTPSensor(*, path, file_pattern='', newer_than=None, sftp_conn_id='sftp_default', **kwargs)

您会看到某些参数(

path
file_pattern
newer_than
sftp_conn_id
)有自己的、明确的参数。如果您传递任何其他关键字参数,它们将被打包到包罗万象的 **kwargs 字典中。

就您而言,您将传递

task_id
作为您的第一个参数。由于它不是一个显式参数,Python 假设
task_id
和所有后续参数应该打包到
kwargs
中。

SFTPSensor
不希望
kwargs
包含显式参数
file_pattern
,因此它会抛出错误:

Invalid arguments were: **kwargs: {'file_pattern': '*'}

希望这有帮助

总结: 调用函数时,kwargs 出现在显式参数之后。


0
投票

您可能使用旧版本的 Airflow 文件模式,该模式是在 Airflow 2.5 中引入的,特别是在 apache-airflow-providers-sftp 提供程序更新 4.1.0 中。此添加允许传感器使用通配符模式过滤文件,

© www.soinside.com 2019 - 2024. All rights reserved.