气流并行度

问题描述 投票:0回答:4

本地执行器在调度任务时生成新进程。它创建的进程数量是否有限制。我需要改变它。我需要知道调度程序的“max_threads”和 airflow.cfg 中的“并行性”?

airflow
4个回答
108
投票

并行:不是一个非常具有描述性的名称。描述说它设置了气流安装的最大任务实例,这有点含糊 - 如果我有两台运行气流工作人员的主机,我会在两台主机上安装气流,所以这应该是两个安装,但基于上下文这里的“每次安装”是指“每个 Airflow 状态数据库”。我将其命名为 max_active_tasks。

dag_concurrency: 尽管名称基于注释,但这实际上是任务并发性,并且是每个工作人员的。我将其命名为 max_active_tasks_for_worker (per_worker 建议这是工作人员的全局设置,但我认为您可以为此设置不同值的工作人员)。

max_active_runs_per_dag:这个还不错,但由于它似乎只是匹配 DAG kwarg 的默认值,因此最好在名称中反映这一点,例如 default_max_active_runs_for_dags 那么让我们继续讨论 DAG kwargs:

并发:同样,有一个像这样的通用名称,再加上并发在其他地方用于不同的东西这一事实使得这非常令人困惑。我称之为 max_active_tasks。

max_active_runs:这个对我来说听起来不错。

来源:https://issues.apache.org/jira/browse/AIRFLOW-57


max_threads 使用户可以控制 CPU 使用情况。它指定调度程序并行性。


25
投票

已经是 2019 年了,更多更新的文档已经发布。简而言之:

AIRFLOW__CORE__PARALLELISM
是可以在所有 Airflow 上同时运行的任务实例的最大数量(所有 dags 上的所有任务)

AIRFLOW__CORE__DAG_CONCURRENCY
是单个特定 DAG 允许同时运行的最大任务实例数

这些文档更详细地描述了它:

根据https://www.astronomer.io/guides/airflow-scaling-workers/

并行度是可以运行的最大任务实例数 同时在气流上。这意味着在所有正在运行的 DAG 中,没有 一次将运行超过 32 个任务。

还有

dag_concurrency 是允许运行的任务实例数 在特定 dag 内同时进行。换句话说,你可以有 2 每个 DAG 并行运行 16 个任务,但单个 DAG 运行 50 个任务 也只会运行 16 个任务 - 而不是 32 个

并且,根据 https://airflow.apache.org/faq.html#how-to-reduce-airflow-dag-scheduling-latency-in-product

max_threads:调度程序将并行生成多个线程 日程安排。这是由 max_threads 控制的,默认值为 2. 用户应在生产中将此值增加到更大的值(例如调度程序运行的 cpu 数量 - 1)。

但这最后一块似乎不应该占用太多时间,因为它只是“调度”部分。不是实际运行的部分。因此,我们认为不需要对

max_threads
进行太多调整,但
AIRFLOW__CORE__PARALLELISM
AIRFLOW__CORE__DAG_CONCURRENCY
确实影响了我们。


15
投票

调度程序的

max_threads
是调度程序并行化的进程数。
max_threads
不能超过 cpu 计数。 LocalExecutor 的
parallelism
是 LocalExecutor 应运行的并发任务数。调度程序和 LocalExecutor 都使用 python 的多处理库来实现并行性。


0
投票

我面临同样类型的问题无法读取服务日志:无效的 URL 'http://:8793/log/dag_id=

问题是我的 dag 没有安装到 celery 工人上 另外环境变量 AIRFLOW__CORE__EXECUTOR 需要设置 CeleryExecutor

celery-worker-1:
    build: ./airflow/
    depends_on:
      - airflow-cont
      - redis
    command: ''
    entrypoint: celery --app airflow.executors.celery_executor worker --loglevel=INFO
    volumes:
      - /home/.aws:/home/.aws:ro
      - ./mnt/airflow.cfg:/opt/airflow/airflow.cfg
      - ./mnt/webserver_config.py:/opt/airflow/webserver_config.py
      - ./mnt/dags:/opt/airflow/dags
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
      AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres:5432/airflow
    networks:
      - stand_alone_airflow
© www.soinside.com 2019 - 2024. All rights reserved.