欢迎访问 生活随笔!

凯发ag旗舰厅登录网址下载

当前位置: 凯发ag旗舰厅登录网址下载 > 人工智能 > chatgpt >内容正文

chatgpt

深入理解spark两种调度模式:fifo,fair模式 -凯发ag旗舰厅登录网址下载

发布时间:2025/1/21 chatgpt 35 豆豆
凯发ag旗舰厅登录网址下载 收集整理的这篇文章主要介绍了 深入理解spark两种调度模式:fifo,fair模式 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

前面我们应知道了一个任务提交会由dag拆分为job,stage,task,最后提交给taskscheduler,在提交taskscheduler中会根据master初始化taskscheduler和schedulerbackend两个类,并且初始化一个调度池;

1.调度池比较

根据mode初始化调度池pool

def initialize(backend: schedulerbackend) {this.backend = backend// temporarily set rootpool name to empty 这里可以看到调度池初始化最小设置为0rootpool = new pool("", schedulingmode, 0, 0)schedulablebuilder = {schedulingmode match {case schedulingmode.fifo =>new fifoschedulablebuilder(rootpool)case schedulingmode.fair =>new fairschedulablebuilder(rootpool, conf)}}schedulablebuilder.buildpools()}

fifo模式

这个会根据spark.scheduler.mode 来设置fifo or fair,默认的是fifo模式;

fifo模式什么都不做,实现默认的schedulerablebuilder方法,建立的调度池也为空,addtasksetmaneger也是调用默认的;

可以简单的理解为,默认模式fifo什么也不做。。

fair模式

fair模式则重写了buildpools的方法,读取默认路径 $spark_home/conf/fairscheduler.xml文件,也可以通过参数spark.scheduler.allocation.file设置用户自定义配置文件。

文件中配置的是

poolname 线程池名

schedulermode 调度模式(fifo,fair仅有两种)

minshare 初始大小的线程核数

wight 调度池的权重

override def buildpools() {var is: option[inputstream] = nonetry {is = option {schedulerallocfile.map { f =>new fileinputstream(f)}.getorelse {utils.getsparkclassloader.getresourceasstream(default_scheduler_file)}}is.foreach { i => buildfairschedulerpool(i) }} finally {is.foreach(_.close())}// finally create "default" poolbuilddefaultpool()}

同时也重写了addtaskmanager方法

override def addtasksetmanager(manager: schedulable, properties: properties) {var poolname = default_pool_namevar parentpool = rootpool.getschedulablebyname(poolname)if (properties != null) {poolname = properties.getproperty(fair_scheduler_properties, default_pool_name)parentpool = rootpool.getschedulablebyname(poolname)if (parentpool == null) {// we will create a new pool that user has configured in app// instead of being defined in xml fileparentpool = new pool(poolname, default_scheduling_mode,default_minimum_share, default_weight)rootpool.addschedulable(parentpool)loginfo("created pool %s, schedulingmode: %s, minshare: %d, weight: %d".format(poolname, default_scheduling_mode, default_minimum_share, default_weight))}}parentpool.addschedulable(manager)loginfo("added task set " manager.name " tasks to pool " poolname)}

这一段逻辑中是把配置文件中的pool,或者default pool放入rootpool中,然后把tasksetmanager存入rootpool对应的子pool;

2.调度算法比较

除了初始化的调度池不一致外,其实现的调度算法也不一致

实现的调度池pool,在内部实现方法中也会根据mode不一致来实现调度的不同

var tasksetschedulingalgorithm: schedulingalgorithm = {schedulingmode match {case schedulingmode.fair =>new fairschedulingalgorithm()case schedulingmode.fifo =>new fifoschedulingalgorithm()}}

fifo模式

fifo模式的调度方式很容易理解,比较stageid,谁小谁先执行;

这也很好理解,stageid小的任务一般来说是递归的最底层,是最先提交给调度池的;

private[spark] class fifoschedulingalgorithm extends schedulingalgorithm {override def comparator(s1: schedulable, s2: schedulable): boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageid1 = s1.stageidval stageid2 = s2.stageidres = math.signum(stageid1 - stageid2)}if (res < 0) {true} else {false}} }

fair模式

fair模式来说的话,稍微复杂一点;

但是还是比较容易看懂,

1.先比较两个stage的 runningtask使用的核数,其实也可以理解为task的数量,谁小谁的优先级高;

2.比较两个stage的 runningtask 权重,谁的权重大谁先执行;

3.如果前面都一直,则比较名字了(字符串比较),谁大谁先执行;

private[spark] class fairschedulingalgorithm extends schedulingalgorithm {override def comparator(s1: schedulable, s2: schedulable): boolean = {val minshare1 = s1.minshareval minshare2 = s2.minshareval runningtasks1 = s1.runningtasksval runningtasks2 = s2.runningtasksval s1needy = runningtasks1 < minshare1val s2needy = runningtasks2 < minshare2val minshareratio1 = runningtasks1.todouble / math.max(minshare1, 1.0).todoubleval minshareratio2 = runningtasks2.todouble / math.max(minshare2, 1.0).todoubleval tasktoweightratio1 = runningtasks1.todouble / s1.weight.todoubleval tasktoweightratio2 = runningtasks2.todouble / s2.weight.todoublevar compare: int = 0if (s1needy && !s2needy) {return true} else if (!s1needy && s2needy) {return false} else if (s1needy && s2needy) {compare = minshareratio1.compareto(minshareratio2)} else {compare = tasktoweightratio1.compareto(tasktoweightratio2)}if (compare < 0) {true} else if (compare > 0) {false} else {s1.name < s2.name}}

总结:虽然了解一下spark的调度模式,以前在执行中基本都没啥用到,没想到spark还有这样的隐藏功能。

与50位技术专家面对面20年技术见证,附赠技术全景图

总结

以上是凯发ag旗舰厅登录网址下载为你收集整理的深入理解spark两种调度模式:fifo,fair模式的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得凯发ag旗舰厅登录网址下载网站内容还不错,欢迎将凯发ag旗舰厅登录网址下载推荐给好友。

网站地图