读取 XCOM 和 Airflow 变量可能会减慢 Airflow(在 Google Cloud Composer 中)

问题描述 投票:0回答:1

我们正在尝试将每日 (CSV) 提取文件合并到我们的数据仓库中。

在我们的用例中,所有 DAG(约 2000 个)的 DAG 的 Python 代码都是相同的,因此我们通过来自单个 Python 文件的 DAG 生成器逻辑生成它们。 在我们的 DAG 中,我们只有 15 个任务(5 个虚拟任务、2 个 CloudDataFusionStartPipelineOperator 任务、8 个 python 任务)。

在 DAG 生成过程中,我们读取气流变量 (~30-50) 来确定要生成哪些 DAG(这也确定了 DAG 的 ID 以及它们应该处理的模式/表名称)。我们称这些为生成器变量。

在 DAG 生成过程中,DAG 还会通过 ID 读取其配置(每个生成的 DAG 还有 2-3 个气流变量)。我们将这些称为配置器变量。

不幸的是,在我们的 DAG 中,我们必须处理一些

传递的参数(通过 REST API)以及大量任务之间动态计算的信息,因此我们依赖 Airflow 的 XCOM 功能。这意味着 Airflow 数据库中的读取量巨大。

在可能的情况下,我们使用

用户定义的宏来配置任务以延迟数据库读取的执行(XCOM拉取的执行)直到任务执行为止,但它仍然给Airflow(Google Cloud Composer)带来沉重的负载。大约 50 个来自 XCOM 的拉取。

问题

    Airflow 的数据库是为如此高的读取次数(Airflow 变量和主要来自 XCOM 的值)而设计的吗?
  • 如果我们必须在任务之间传递大量动态计算的字段和元数据,我们应该如何重新设计代码?
  • 我们是否应该简单地接受这样的事实:在这种类型的用例中,数据库负载很重,并且简单地垂直扩展数据库?

XCOM 拉取示例:

Metadata = PythonOperator( task_id = TASK_NAME_PREFIX__METADATA + str(dag_id), python_callable = metadataManagment, op_kwargs = { 'dag_id' : dag_id, 'execution_case' : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="execution_case_for_metadata") }}', 'date' : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date") }}', 'enc_path' : '{{ get_runtime_arg("RR", dag_run, "encryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}', 'dec_path' : '{{ get_runtime_arg("RR", dag_run, "decryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}', 'aggr_project_name': ast.literal_eval(AIRFLOW_ENVIRONMENT_VARIABLES)['aggr_project_name'], }, provide_context = True, trigger_rule = TriggerRule.ALL_DONE )
发电机气流变量示例:

key: STD_SCHEMA_NAMES val: [('SCHEMA1', 'MAIN'), ('SCHEMA2', 'MAIN'), ('SCHEMA2', 'SECONDARY')] key: STD_MAIN_SCHEMA1_INSERT_APPEND_TABLES val: ['SCHEMA1_table_1', 'SCHEMA1_table_2', 'SCHEMA1_table_3', ... ] key: STD_MAIN_SCHEMA1_SCD2_TABLES val: ['SCHEMA1_table_i', 'SCHEMA1_table_j', 'SCHEMA1_table_k', ... ] key: STD_MAIN_SCHEMA2_SCD2_TABLES val: ['SCHEMA2_table_l', 'SCHEMA2_table_m', 'SCHEMA2_table_n', ... ] key: STD_SECONDARY_SCHEMA2_TRUNCATE_LOAD_TABLES val: ['SCHEMA2_table_x', 'SCHEMA2_table_y', 'SCHEMA2_table_z', ... ]
DAG 生成器示例:

# DAG_TYPE = STD env_vars = Variable.get('environment_variables') airflow_var_name__dag_typed_schema_name = '_'.join([x for x in [DAG_TYPE, 'SCHEMA_NAMES'] if x]) table_types = ['INSERT_APPEND', 'TRUNCATE_LOAD', 'SCD1', 'SCD2'] list_of_schemas_with_group = ast.literal_eval(Variable.get(airflow_var_name__dag_typed_schema_name, '[]')) tuples_of_var_names = [(x[0], x[1], y, '_'.join([z for z in [DAG_TYPE, x[1], x[0], y, 'TABLES'] if z])) for x in list_of_schemas_with_group for y in table_types] list_of_tables = [(x[0], x[1], x[2], ast.literal_eval(Variable.get(x[3], 'None'))) for x in tuples_of_var_names] list_of_tables = [(x[0], x[1], x[2], x[3]) for x in list_of_tables if x[3] and len(x[3]) > 0] for schema_name, namespace_group, table_type, table_names_with_schema_prefix in list_of_tables: for table_name in table_names_with_schema_prefix: dag_id = str(table_name) globals()[dag_id] = create_dag( dag_id, schedule, default_dag_args, schema_name, table_type, env_vars, tags )
    
airflow airflow-scheduler google-cloud-composer airflow-2.x
1个回答
4
投票
Airflow 的数据库是为如此高的读取次数(Airflow 变量和主要来自 XCOM 的值)而设计的吗?

是的,但您共享的代码是滥用的。您在顶级代码中使用

Variable.get()

。这意味着每次解析 
.py
 文件时,Airflow 都会执行 
Variable.get()
,打开与数据库的会话。假设您没有更改默认值 (
min_file_process_interval),这意味着每 30 秒您对每个 DAG 执行一次 Variable.get()

将其转化为您提到的数字,您有 2000 个 DAG,每个 DAG 进行约 30-50 个

Variable.get()

 调用,这意味着您每 30 秒对数据库进行 6000-10000 次调用。这太欺负人了。

如果您希望在顶级代码中使用变量,您应该使用环境变量而不是 Airflow 变量。这在

具有环境变量的动态 DAG 文档中进行了解释。

注意 Airflow 提供了定义自定义

秘密后端的选项。

如果我们必须在任务之间传递大量动态计算的字段和元数据,我们应该如何重新设计代码?

气流可以处理大容量。问题更多在于您如何编写 DAG。如果对 Xcom 表有疑问,或者您是否希望将其存储在其他地方 Airflow 支持

自定义 Xcom 后端

我们是否应该简单地接受这样的事实:在这种类型的用例中,数据库负载很重,并且简单地垂直扩展数据库?

根据您的描述,您可以采取一些措施来改善这种情况。气流针对大量任务和任务(垂直规模和水平规模)进行了测试。如果您发现性能问题的证据,您可以通过向项目打开

Github Issue 来报告。我

© www.soinside.com 2019 - 2024. All rights reserved.