如何编写具有多个类似任务的 DAG

问题描述 投票:0回答:2

我正在尝试通过 Airflow 将数据从 Postgres 中的 50 个表移动到 BigQuery。每个表都遵循相同的 4 个操作,只是数据不同:

get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data

对 50 个表重复这些操作的最简洁方法是什么?

我考虑过的一些事情:

  • 为每个表制作一个 DAG
    • 有没有办法制作“DAG of DAG”?例如,我可能希望
      table 1
      table 2
      之前处理。我知道我可以使用跨 DAG 依赖项来实现类似的效果,但我希望有一个“主 DAG”来管理这些关系。
  • 在单个 DAG 中写出 200 个任务(我知道这很丑),然后执行类似的操作
get_latest_timestamp_table1 >> copy_data_to_bigquery_table1 >> verify_bigquery_data_table1 >> delete_postgres_data_table1
get_latest_timestamp_table2 >> copy_data_to_bigquery_table2 >> verify_bigquery_data_table2 >> delete_postgres_data_table2
...
  • 在主 DAG 内部循环(不确定这是否可能),类似
for table in table_names:
    get_latest_timestamp = {PythonOperator with tablename as an input}
    ...
    get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data

还有其他想法吗?我对 Airflow 还很陌生,所以不确定重复类似操作的最佳实践是什么。

我尝试将每个任务(50*4=200 个任务)复制/粘贴到单个 DAG 中。它有效,但很丑。

airflow airflow-2.x
2个回答
2
投票

为了避免代码复制,您可以使用任务组。这描述得很好here

for table in table_names:
with TaskGroup(group_id='process_tables') as process_tables:
    get_latest_timestamp = EmptyOperator(task_id=f'{table}_timestamp')
    copy_data_to_bigquery = EmptyOperator(task_id=f'{table}_to_bq')
    .....
    get_latest_timestamp >> copy_data_to_bigquery

您可以通过提供任务组来获取 xcom,如下所示:'''

process_tables.copy_data_to_bigquery

将任务组与其他任务组合起来就像这样

开始 >> process_tables >> 结束


0
投票

我使用了相同的 TaskGroups 方法并对其进行了一些增强,因此您将拥有:

  • 每桌一组
  • 所有这些组都嵌套在一个 parentGroup

所以它看起来像这样:

enter image description here

代码应该如下所示:

with DAG(dag_id="daily-sync-pipeline-dag",
        start_date=datetime(2025,1,1),
        schedule_interval="@hourly",
        catchup=False) as dag:
        
        start = BashOperator(
                task_id="start",
                ...
            )

        end = BashOperator(
                task_id="end",
                ...
            )


        processAllTables = TaskGroup(group_id='processAllTables')

        tables = ['TABLE_1', 'TABLE_2', 'TABLE_3']
        for table in tables:
            with TaskGroup(group_id=f'{table}_sync', parent_group=processAllTables):
                exportToS3 = DummyOperator(task_id=f'{table}_exportToS3')
                importToSnowflake = DummyOperator(task_id=f'{table}_importToSnowflake')
                finalTouch = DummyOperator(task_id=f'{table}_finalTouch')
                        
                exportToS3 >> importToSnowflake >> finalTouch

                # Adding another conditional step:
                if table == "MY_SPECIAL_TABLE":
                    extraStep = DummyOperator(task_id=f'{table}_extraStep')
                    finalTouch >> extraStep
            
start >> processAllTables >> end
© www.soinside.com 2019 - 2024. All rights reserved.