如何利用airflow进行实时数据处理

问题描述 投票:0回答:2

我有一个场景,我想处理 csv 文件并加载到其他数据库:

案例

  1. pic csv 文件并加载到与 csv 同名的 mysql
  2. 然后使用 python 任务文件对加载的行进行一些修改
  3. 之后从mysql中提取数据并加载到其他数据库

CSV 文件从远程服务器发送到文件夹中的一台 Airflow 服务器。

我们必须选择这些 csv 文件并通过 python 脚本进行处理。

假设我选择一个 csv 文件,那么我需要以类似的依赖方式将此 csv 文件传递给操作员的其余部分

filename : abc.csv

task1 >> task2 >> task3 >>task4

因此 abc.csv 应该可用于所有任务。

请告诉如何继续。

etl airflow
2个回答
1
投票

您的场景与实时无关。这是按计划/间隔摄取。或者也许您可以使用 SensorTask Operator 来检测数据可用性。

将您的每个需求实现为函数并从运算符实例中调用它们。 将操作员添加到 DAG,并制定适合您传入 Feed 的计划。

如何传递和访问参数是 -kwargs python_callable 初始化操作符时 扩展运算符时执行方法中的 -context['param_key'] -jinja 模板

相关... airflow 从 cli 传递参数 气流中的execution_date:需要作为变量访问


0
投票

Airflow 中任务通信的方式是使用 XCOM,但它适用于小值,而不是文件内容。

如果您希望任务使用相同的 csv 文件,您应该将其保存在某个位置,然后在 XCOM 中传递该位置的路径。

我们正在使用 LocalExecutor,因此本地文件系统对我们来说很好。

我们决定为每个 dag 创建一个文件夹,并使用 dag 的名称。在该文件夹内,我们为每个执行日期生成一个文件夹(我们在第一个任务中执行此操作,我们总是称之为

start_task
)。然后我们通过Xcom将该文件夹的路径传递给后续任务。

start_task 的示例代码:

def start(share_path, **context):
    execution_date_as_string = context['execution_date'].strftime(DATE_FORMAT)    
    execution_folder_path = os.path.join(share_path, 'my_dag_name', execution_date_as_string)
    _create_folder_delete_if_exists(execution_folder_path)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="execution_folder_path", value=execution_folder_path)

start_task = PythonOperator(
    task_id='start_task',
    provide_context=True,
    python_callable=start,
    op_args=[share_path],
    dag=dag
)

share_path
是所有dags的基目录,我们将其保存在Airflow变量中。

后续任务可以通过以下方式获取执行文件夹:

execution_folder_path = task_instance.xcom_pull(task_ids='start_task', key='execution_folder_path')
© www.soinside.com 2019 - 2024. All rights reserved.