我正在Google Cloud Composer上使用airflow(版本:composer-1.10.2-airflow-1.10.6)。
我意识到,当有大量任务需要处理时,调度器不会调度任务(见下面的甘特图)。
(不要注意颜色,红色的任务是 "createTable Operators",如果表已经存在就会失败,所以在DAG的下一部分(重要的部分)运行之前,它们必须失败5次)
任务之间有几个小时的差距! (例如从上午10点到下午15点之间有5个小时的时间,但什么也没发生)
通常情况下,如果有大约40个DAG,每个DAG有大约100-200个任务(有时更多一些),它就能正常工作。但是最近我添加了2个任务很多的DAG(每个任务约5000个),调度器非常慢或者不调度任务。在截图上,我在下午15点暂停了2个有大量任务的DAG,调度器又恢复了,工作很好。
关于这个问题,你有什么解决办法吗?
Airflow本来就是一个处理 "无限 "任务量的工具。
下面是我的环境的一些信息。
下面是一些关于风量配置的信息。
╔════════════════════════════════╦═══════╗
║ Airflow parameter ║ value ║
╠════════════════════════════════╬═══════╣
║ -(celery)- ║ ║
║ worker_concurrency ║ 32 ║
║ -(webserver)- ║ ║
║ default_dag_run_display_number ║ 2 ║
║ workers ║ 2 ║
║ worker_refresh_interval ║ 60 ║
║ -(core)- ║ ║
║ max_active_runs_per_dag ║ 1 ║
║ dagbag_import_timeout ║ 600 ║
║ parallelism ║ 200 ║
║ min_file_process_interval ║ 60 ║
║ -(scheduler)- ║ ║
║ processor_poll_interval ║ 5 ║
║ max_threads ║ 2 ║
╚════════════════════════════════╩═══════╝
谢谢你的帮助
EDIT.我的DAG中,有26个是由一个.py文件创建的,通过解析一个巨大的JSON变量来创建所有的DAG和任务。
我的26个DAG是由一个单一的.py文件通过解析一个巨大的JSON变量来创建所有的DAG和任务。
也许问题来自于此,因为今天Airflow正在调度其他DAG的任务,而不是我描述的26个DAG(特别是2个大DAG).更准确地说,Airflow有时会调度我的26个DAG的任务,但它更容易和更经常地调度其他DAG的任务。
我觉得你应该 升级 到Composer 1.10.4版本,有最新的补丁总是有帮助的。
你使用的是什么数据库?拥有所有这些失败的任务是非常不可取的。你可以使用 CREATE TABLE IF NOT EXISTS ...
?
任务间的高延迟通常是一个指标,表明存在与调度器相关的瓶颈(而不是与工作者相关的东西)。即使反复运行相同的DAG,Composer环境仍然有可能出现这样的性能瓶颈,因为每次的工作分配都不一样,或者后台可能有不同的进程在运行。
首先,我建议增加调度器的可用线程数(scheduler.max_threads
),然后确保你的调度器没有消耗它所在节点的所有CPU。您可以通过识别调度器所在的节点,然后在Cloud Console中检查它的CPU指标。要找到节点名称。
# Obtain the Composer namespace name
kubectl get namespaces | grep composer
# Check for the scheduler
kubectl get pods -n $NAMESPACE -o wide | grep scheduler
如果上面的方法没有帮助,那么也有可能是调度器故意阻塞了某个条件。要检查调度器在检查任务运行时评估的所有条件,可以在云控制台中设置 core.logging_level=DEBUG
. 在调度器日志中(您可以在云日志中进行过滤),您可以检查所有通过或失败的条件,以便任务运行或保持排队。