我们正在尝试将每日 (CSV) 提取文件合并到我们的数据仓库中。
在我们的用例中,所有 DAG(约 2000 个)的 DAG 的 Python 代码都是相同的,因此我们通过来自单个 Python 文件的 DAG 生成器逻辑生成它们。 在我们的 DAG 中,我们只有 15 个任务(5 个虚拟任务、2 个 CloudDataFusionStartPipelineOperator 任务、8 个 python 任务)。
在 DAG 生成过程中,我们读取气流变量 (~30-50) 来确定要生成哪些 DAG(这也确定了 DAG 的 ID 以及它们应该处理的模式/表名称)。我们称这些为生成器变量。在 DAG 生成过程中,DAG 还会通过 ID 读取其配置(每个生成的 DAG 还有 2-3 个气流变量)。我们将这些称为配置器变量。
不幸的是,在我们的 DAG 中,我们必须处理一些
传递的参数(通过 REST API)以及大量任务之间动态计算的信息,因此我们依赖 Airflow 的 XCOM 功能。这意味着 Airflow 数据库中的读取量巨大。
在可能的情况下,我们使用用户定义的宏来配置任务以延迟数据库读取的执行(XCOM拉取的执行)直到任务执行为止,但它仍然给Airflow(Google Cloud Composer)带来沉重的负载。大约 50 个来自 XCOM 的拉取。
问题:
Metadata = PythonOperator(
task_id = TASK_NAME_PREFIX__METADATA + str(dag_id),
python_callable = metadataManagment,
op_kwargs = {
'dag_id' : dag_id,
'execution_case' : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="execution_case_for_metadata") }}',
'date' : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date") }}',
'enc_path' : '{{ get_runtime_arg("RR", dag_run, "encryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
'dec_path' : '{{ get_runtime_arg("RR", dag_run, "decryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
'aggr_project_name': ast.literal_eval(AIRFLOW_ENVIRONMENT_VARIABLES)['aggr_project_name'],
},
provide_context = True,
trigger_rule = TriggerRule.ALL_DONE
)
发电机气流变量示例:
key: STD_SCHEMA_NAMES
val: [('SCHEMA1', 'MAIN'), ('SCHEMA2', 'MAIN'), ('SCHEMA2', 'SECONDARY')]
key: STD_MAIN_SCHEMA1_INSERT_APPEND_TABLES
val: ['SCHEMA1_table_1', 'SCHEMA1_table_2', 'SCHEMA1_table_3', ... ]
key: STD_MAIN_SCHEMA1_SCD2_TABLES
val: ['SCHEMA1_table_i', 'SCHEMA1_table_j', 'SCHEMA1_table_k', ... ]
key: STD_MAIN_SCHEMA2_SCD2_TABLES
val: ['SCHEMA2_table_l', 'SCHEMA2_table_m', 'SCHEMA2_table_n', ... ]
key: STD_SECONDARY_SCHEMA2_TRUNCATE_LOAD_TABLES
val: ['SCHEMA2_table_x', 'SCHEMA2_table_y', 'SCHEMA2_table_z', ... ]
DAG 生成器示例:
# DAG_TYPE = STD
env_vars = Variable.get('environment_variables')
airflow_var_name__dag_typed_schema_name = '_'.join([x for x in [DAG_TYPE, 'SCHEMA_NAMES'] if x])
table_types = ['INSERT_APPEND', 'TRUNCATE_LOAD', 'SCD1', 'SCD2']
list_of_schemas_with_group = ast.literal_eval(Variable.get(airflow_var_name__dag_typed_schema_name, '[]'))
tuples_of_var_names = [(x[0], x[1], y, '_'.join([z for z in [DAG_TYPE, x[1], x[0], y, 'TABLES'] if z])) for x in list_of_schemas_with_group for y in table_types]
list_of_tables = [(x[0], x[1], x[2], ast.literal_eval(Variable.get(x[3], 'None'))) for x in tuples_of_var_names]
list_of_tables = [(x[0], x[1], x[2], x[3]) for x in list_of_tables if x[3] and len(x[3]) > 0]
for schema_name, namespace_group, table_type, table_names_with_schema_prefix in list_of_tables:
for table_name in table_names_with_schema_prefix:
dag_id = str(table_name)
globals()[dag_id] = create_dag( dag_id,
schedule,
default_dag_args,
schema_name,
table_type,
env_vars,
tags )
Airflow 的数据库是为如此高的读取次数(Airflow 变量和主要来自 XCOM 的值)而设计的吗?是的,但您共享的代码是滥用的。您在顶级代码中使用
Variable.get()
。这意味着每次解析
.py
文件时,Airflow 都会执行
Variable.get()
,打开与数据库的会话。假设您没有更改默认值 (min_file_process_interval),这意味着每 30 秒您对每个 DAG 执行一次
Variable.get()
。将其转化为您提到的数字,您有 2000 个 DAG,每个 DAG 进行约 30-50 个
Variable.get()
调用,这意味着您每 30 秒对数据库进行 6000-10000 次调用。这太欺负人了。如果您希望在顶级代码中使用变量,您应该使用环境变量而不是 Airflow 变量。这在
具有环境变量的动态 DAG 文档中进行了解释。
注意 Airflow 提供了定义自定义秘密后端的选项。
如果我们必须在任务之间传递大量动态计算的字段和元数据,我们应该如何重新设计代码?气流可以处理大容量。问题更多在于您如何编写 DAG。如果对 Xcom 表有疑问,或者您是否希望将其存储在其他地方 Airflow 支持
自定义 Xcom 后端。
我们是否应该简单地接受这样的事实:在这种类型的用例中,数据库负载很重,并且简单地垂直扩展数据库?根据您的描述,您可以采取一些措施来改善这种情况。气流针对大量任务和任务(垂直规模和水平规模)进行了测试。如果您发现性能问题的证据,您可以通过向项目打开
Github Issue 来报告。我