我在 airflow 工作,它有 30 多个任务集成到工作中。为了优化云成本,我们实施了以下方法。
创建了一个 json,其中包含每个 taskname,min_workers for that task, max_workers for that task, spark_version for that task, driver_node_type for that task, worker_node_type for that task.
在代码中,我们将任务名称传递给集群创建方法,在其中我们从 admin-> 变量中读取 json,并为该特定任务获取该 json 中的参数。
下面是单节点的json。为此,我们只为该任务传递spark_version,为该任务传递driver_node_type,为该任务传递worker_node_type。
{
"cluster_name": "my-cluster",
"spark_version": spark_version for that task,
"node_type_id": worker_node_type for that task,
"driver_node_type_id": driver_node_type for that task,
"enable_local_disk_encryption": true,
"spark_conf": {
"spark.speculation": true
},
"num_workers": 0,
"custom_tags":{
"Environment":"PROD"
}
}
下面是多节点的 json,我们为该任务传递min_workers,为该任务传递max_workers,为该任务传递spark_version,为该任务传递driver_node_type,为该任务传递worker_node_type。
{
"cluster_name": "my-cluster",
"spark_version": spark_version for that task,
"node_type_id": worker_node_type for that task,
"driver_node_type_id": driver_node_type for that task,
"enable_local_disk_encryption": true,
"autoscale":{
"min_workers":min_workers,
"max_workers":max_workers
},
"azure_attributes":{
"first_on_demand":1
},
"spark_conf": {
"spark.speculation": true
},
"custom_tags":{
"Environment":"PROD"
}
}
当我在一个更大的研究中为一个任务设置单节点时,问题就来了。 我在 log4j 输出和无限期运行的任务中遇到以下问题。当我为该任务从单节点更改为多节点时,它会在 10 分钟内完成。
警告消息是“初始作业尚未接受任何资源;检查您的集群 UI 以确保工作人员已注册并拥有足够的资源”。
提议的行为是,使用 pandas 代码的任务应该在单节点集群上运行,而使用 spark 代码的任务应该在多节点集群上运行。
集群之间的切换是我们试图根据条件在这里实现的。