我正在尝试通过 Airflow 将数据从 Postgres 中的 50 个表移动到 BigQuery。每个表都遵循相同的 4 个操作,只是数据不同:
get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data
我考虑过的一些事情:
table 1
在 table 2
之前处理。我知道我可以使用跨 DAG 依赖项来实现类似的效果,但我希望有一个“主 DAG”来管理这些关系。get_latest_timestamp_table1 >> copy_data_to_bigquery_table1 >> verify_bigquery_data_table1 >> delete_postgres_data_table1
get_latest_timestamp_table2 >> copy_data_to_bigquery_table2 >> verify_bigquery_data_table2 >> delete_postgres_data_table2
...
for table in table_names:
get_latest_timestamp = {PythonOperator with tablename as an input}
...
get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data
还有其他想法吗?我对 Airflow 还很陌生,所以不确定重复类似操作的最佳实践是什么。
我尝试将每个任务(50*4=200 个任务)复制/粘贴到单个 DAG 中。它有效,但很丑。
为了避免代码复制,您可以使用任务组。这描述得很好here
for table in table_names:
with TaskGroup(group_id='process_tables') as process_tables:
get_latest_timestamp = EmptyOperator(task_id=f'{table}_timestamp')
copy_data_to_bigquery = EmptyOperator(task_id=f'{table}_to_bq')
.....
get_latest_timestamp >> copy_data_to_bigquery
您可以通过提供任务组来获取 xcom,如下所示:'''
process_tables.copy_data_to_bigquery
将任务组与其他任务组合起来就像这样
开始 >> process_tables >> 结束
我使用了相同的 TaskGroups 方法并对其进行了一些增强,因此您将拥有:
所以它看起来像这样:
代码应该如下所示:
with DAG(dag_id="daily-sync-pipeline-dag",
start_date=datetime(2025,1,1),
schedule_interval="@hourly",
catchup=False) as dag:
start = BashOperator(
task_id="start",
...
)
end = BashOperator(
task_id="end",
...
)
processAllTables = TaskGroup(group_id='processAllTables')
tables = ['TABLE_1', 'TABLE_2', 'TABLE_3']
for table in tables:
with TaskGroup(group_id=f'{table}_sync', parent_group=processAllTables):
exportToS3 = DummyOperator(task_id=f'{table}_exportToS3')
importToSnowflake = DummyOperator(task_id=f'{table}_importToSnowflake')
finalTouch = DummyOperator(task_id=f'{table}_finalTouch')
exportToS3 >> importToSnowflake >> finalTouch
# Adding another conditional step:
if table == "MY_SPECIAL_TABLE":
extraStep = DummyOperator(task_id=f'{table}_extraStep')
finalTouch >> extraStep
start >> processAllTables >> end