大量任务时,气流调度器不调度(或缓慢调度)。

问题描述 投票:0回答:1

我正在Google Cloud Composer上使用airflow(版本:composer-1.10.2-airflow-1.10.6)。

我意识到,当有大量任务需要处理时,调度器不会调度任务(见下面的甘特图)。

Gantt View(不要注意颜色,红色的任务是 "createTable Operators",如果表已经存在就会失败,所以在DAG的下一部分(重要的部分)运行之前,它们必须失败5次)

任务之间有几个小时的差距! (例如从上午10点到下午15点之间有5个小时的时间,但什么也没发生)

通常情况下,如果有大约40个DAG,每个DAG有大约100-200个任务(有时更多一些),它就能正常工作。但是最近我添加了2个任务很多的DAG(每个任务约5000个),调度器非常慢或者不调度任务。在截图上,我在下午15点暂停了2个有大量任务的DAG,调度器又恢复了,工作很好。

关于这个问题,你有什么解决办法吗?

Airflow本来就是一个处理 "无限 "任务量的工具。

下面是我的环境的一些信息。

  • 版本: composer-1.10.2airflow-1.10.6
  • 集群大小:6 (12vCPU, 96GB内存)

下面是一些关于风量配置的信息。

╔════════════════════════════════╦═══════╗
║ Airflow parameter              ║ value ║
╠════════════════════════════════╬═══════╣
║ -(celery)-                     ║       ║
║ worker_concurrency             ║ 32    ║
║ -(webserver)-                  ║       ║
║ default_dag_run_display_number ║ 2     ║
║ workers                        ║ 2     ║
║ worker_refresh_interval        ║ 60    ║
║ -(core)-                       ║       ║
║ max_active_runs_per_dag        ║ 1     ║
║ dagbag_import_timeout          ║ 600   ║
║ parallelism                    ║ 200   ║
║ min_file_process_interval      ║ 60    ║
║ -(scheduler)-                  ║       ║
║ processor_poll_interval        ║ 5     ║
║ max_threads                    ║ 2     ║
╚════════════════════════════════╩═══════╝

谢谢你的帮助

EDIT.我的DAG中,有26个是由一个.py文件创建的,通过解析一个巨大的JSON变量来创建所有的DAG和任务。

我的26个DAG是由一个单一的.py文件通过解析一个巨大的JSON变量来创建所有的DAG和任务。

也许问题来自于此,因为今天Airflow正在调度其他DAG的任务,而不是我描述的26个DAG(特别是2个大DAG).更准确地说,Airflow有时会调度我的26个DAG的任务,但它更容易和更经常地调度其他DAG的任务。

airflow airflow-scheduler google-cloud-composer airflow-worker
1个回答
2
投票

我觉得你应该 升级 到Composer 1.10.4版本,有最新的补丁总是有帮助的。

你使用的是什么数据库?拥有所有这些失败的任务是非常不可取的。你可以使用 CREATE TABLE IF NOT EXISTS ...?


2
投票

任务间的高延迟通常是一个指标,表明存在与调度器相关的瓶颈(而不是与工作者相关的东西)。即使反复运行相同的DAG,Composer环境仍然有可能出现这样的性能瓶颈,因为每次的工作分配都不一样,或者后台可能有不同的进程在运行。

首先,我建议增加调度器的可用线程数(scheduler.max_threads),然后确保你的调度器没有消耗它所在节点的所有CPU。您可以通过识别调度器所在的节点,然后在Cloud Console中检查它的CPU指标。要找到节点名称。

# Obtain the Composer namespace name
kubectl get namespaces | grep composer

# Check for the scheduler
kubectl get pods -n $NAMESPACE -o wide | grep scheduler

如果上面的方法没有帮助,那么也有可能是调度器故意阻塞了某个条件。要检查调度器在检查任务运行时评估的所有条件,可以在云控制台中设置 core.logging_level=DEBUG. 在调度器日志中(您可以在云日志中进行过滤),您可以检查所有通过或失败的条件,以便任务运行或保持排队。

© www.soinside.com 2019 - 2024. All rights reserved.