我对 Airflow 还很陌生。我正在尝试设置 SFTPSensor 以查看 SFTP 服务器上的文件夹中是否出现任何文件。对我来说,这听起来像是 file_pattern 属性中的正则表达式“*”:
import airflow
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.sensors.python import PythonSensor
from datetime import datetime, timedelta
args = {
"owner": "My_company",
"start_date": datetime(2022,10,17)}
def get_list_of_files():
ftp_hook = SFTPHook(ftp_conn_id="My_company")
files_list = ftp_hook.list_directory("/in/")
logging.info("The list of files is the following:")
logging.info(files_list)
return files_list
dag = DAG(
dag_id = "Checking_SFTP_Server_with_sensor",
default_args=args,
schedule_interval="0 8 * * *",
dagrun_timeout=timedelta(minutes=1),
tags=['My_company'])
check_SFTP = SFTPSensor(task_id="check_SFTP",
sftp_conn_id="My_company",
path="/in/",
file_pattern="*",
poke_interval=15,
timeout=60*5,
dag=dag
)
start = DummyOperator(task_id='start', dag = dag)
def createOrderProcessingTask(file):
return TriggerDagRunOperator(
task_id = f'process_order_{file}',
trigger_dag_id = "Processing_the_order",
conf = {"file_name": file},
dag = dag
)
end = DummyOperator(task_id='end', dag = dag)
files = get_list_of_files()
check_SFTP >> start
for file in files:
task = createOrderProcessingTask(file)
start >> task >> end
但我无法处理该属性“file_pattern”。上面的 DAG 因错误而中断:
Broken DAG: [/opt/airflow/dags/repo/dags/check_sftp_server_with_sensor.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 390, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 744, in __init__
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
airflow.exceptions.AirflowException: Invalid arguments were passed to SFTPSensor (task_id: check_SFTP). Invalid arguments were:
**kwargs: {'file_pattern': '*'}
我错过了什么?我应该使用不同的方法来解决这个问题吗?
您可能混淆了关键字参数的顺序
看看签名:
SFTPSensor(*, path, file_pattern='', newer_than=None, sftp_conn_id='sftp_default', **kwargs)
您会看到某些参数(
path
、file_pattern
、newer_than
和 sftp_conn_id
)有自己的、明确的参数。如果您传递任何其他关键字参数,它们将被打包到包罗万象的 **kwargs 字典中。
就您而言,您将传递
task_id
作为您的第一个参数。由于它不是一个显式参数,Python 假设 task_id
和所有后续参数应该打包到 kwargs
中。
SFTPSensor
不希望 kwargs
包含显式参数 file_pattern
,因此它会抛出错误:
Invalid arguments were: **kwargs: {'file_pattern': '*'}
希望这有帮助
总结: 调用函数时,kwargs 出现在显式参数之后。
您可能使用旧版本的 Airflow 文件模式,该模式是在 Airflow 2.5 中引入的,特别是在 apache-airflow-providers-sftp 提供程序更新 4.1.0 中。此添加允许传感器使用通配符模式过滤文件,