我有一个场景,我想处理 csv 文件并加载到其他数据库:
案例
CSV 文件从远程服务器发送到文件夹中的一台 Airflow 服务器。
我们必须选择这些 csv 文件并通过 python 脚本进行处理。
假设我选择一个 csv 文件,那么我需要以类似的依赖方式将此 csv 文件传递给操作员的其余部分
filename : abc.csv
task1 >> task2 >> task3 >>task4
因此 abc.csv 应该可用于所有任务。
请告诉如何继续。
您的场景与实时无关。这是按计划/间隔摄取。或者也许您可以使用 SensorTask Operator 来检测数据可用性。
将您的每个需求实现为函数并从运算符实例中调用它们。 将操作员添加到 DAG,并制定适合您传入 Feed 的计划。
如何传递和访问参数是 -kwargs python_callable 初始化操作符时 扩展运算符时执行方法中的 -context['param_key'] -jinja 模板
Airflow 中任务通信的方式是使用 XCOM,但它适用于小值,而不是文件内容。
如果您希望任务使用相同的 csv 文件,您应该将其保存在某个位置,然后在 XCOM 中传递该位置的路径。
我们正在使用 LocalExecutor,因此本地文件系统对我们来说很好。
我们决定为每个 dag 创建一个文件夹,并使用 dag 的名称。在该文件夹内,我们为每个执行日期生成一个文件夹(我们在第一个任务中执行此操作,我们总是称之为
start_task
)。然后我们通过Xcom将该文件夹的路径传递给后续任务。
start_task 的示例代码:
def start(share_path, **context):
execution_date_as_string = context['execution_date'].strftime(DATE_FORMAT)
execution_folder_path = os.path.join(share_path, 'my_dag_name', execution_date_as_string)
_create_folder_delete_if_exists(execution_folder_path)
task_instance = context['task_instance']
task_instance.xcom_push(key="execution_folder_path", value=execution_folder_path)
start_task = PythonOperator(
task_id='start_task',
provide_context=True,
python_callable=start,
op_args=[share_path],
dag=dag
)
share_path
是所有dags的基目录,我们将其保存在Airflow变量中。
后续任务可以通过以下方式获取执行文件夹:
execution_folder_path = task_instance.xcom_pull(task_ids='start_task', key='execution_folder_path')