我知道对于 Spark,我们可以将不同的池设置为公平或先进先出,并且行为可以不同。然而,在 fairscheduler.xml 中,我们还可以将单个池设置为 Fair 或 FIFO,我测试了几次,因为它们的行为似乎是相同的。然后我看了一下spark源码,调度算法是这样的:
/**
* An interface for sort algorithm
* FIFO: FIFO algorithm between TaskSetManagers
* FS: FS algorithm between Pools, and FIFO or FS within Pools
*/
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm{
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0
}
}
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm{
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
s1.name < s2.name
}
}
}
在fairSchedulingAlgorithm中,如果s1和s2来自同一个池,则minshare、runningtask和weight应该相同,这样我们总是可以得到返回值为false的值。所以它们不是公平的,而是先进先出的。我的 fairscheduler.xml 是这样的:
<?xml version="1.0"?>
<allocations>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>3</weight>
<minShare>2</minShare>
</pool>
<pool name="cubepublishing">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>0</minShare>
</pool>
</allocations>
spark.scheduler.mode 是:
# job scheduler
spark.scheduler.mode FAIR
spark.scheduler.allocation.file conf/fairscheduler.xml
感谢您的帮助!
当您使用 Spark-Submit 或任何其他方式在集群中提交作业时,它将交给 Spark 调度程序,负责实现作业的逻辑计划。在 Spark 中,我们有两种模式。
1。先进先出 默认情况下,Spark 的调度程序以 FIFO 方式运行作业。每个作业分为多个阶段(例如映射和减少阶段),第一个作业在所有可用资源上获得优先级,而其阶段有任务要启动,然后第二个作业获得优先级,依此类推。如果队列头部的作业不需要使用整个集群,后面的作业可以立即开始运行,但是如果队列头部的作业很大,那么后面的作业可能会明显延迟。
2。公平 公平调度程序还支持将作业分组到池中,并为每个池设置不同的调度选项(例如权重)。例如,这对于为更重要的作业创建高优先级池很有用,或者将每个用户的作业分组在一起并为用户提供平等的份额,无论他们有多少并发作业,而不是为作业提供平等的份额。这种方法是根据 Hadoop Fair Scheduler 建模的。
在没有任何干预的情况下,新提交的作业会进入默认池,但可以通过将spark.scheduler.pool“本地属性”添加到提交作业的线程中的SparkContext来设置作业池。