Spark Scheduler

类似MapReduce的任务调度,Spark也有自身的资源调度方式,包含应用程序间应用程序中两个层面的策略调度。

每个Spark程序(SparkContext实例)运行一个独立的执行进程,集群管理器提供了应用程序间的调度处理措施。其次,在每个Spark应用程序中,不同线程提交的多个Job作业(Spark Actions)可以同时运行。

作业调度

常见作业调度的基本概念:

  • Job : 作业,一次Action生成的一个或多个Stage组成的一次计算作业。
  • Stage : 调度阶段,不存在shuffle操作的调度过程,一个Stage中都是窄依赖,一个调度阶段对应一个任务集。
  • TaskSet : 任务集,一组互相关联的,且没有shuffle依赖关系的任务组成的集合。TaskSet的大小和Partition大小有关。
  • Task : 任务,单个分区数据集上的最小处理单元。根据Stage有多少个分区决定将Stage划分成多少个Task,然后生成TaskSet放到TaskScheduler。如果 Spark 为任务指定了20个cores,Spark 就可以并发处理 20 个task。

  一个Job就对应一个DAG图,划分DAG图时,首先将整个Job划分成一个FinalStage,从后往前回溯,每遇到shuffle操作就划分一个新的Stage出来。

应用程序间的调度

每个运行在集群上的Spark应用程序都能得到一个独立的,仅用于运行任务和存储数据的JVM。 如果用户需要共享集群资源,可以通过集群管理器配置不同的选项来分配集群资源。

  1. 静态资源分配

在集群中最简单最有效的方式就是静态资源分配,每个应用程序在整个生命周期中都可以得到一个最大数量的资源。此方式被用于Spark的Standalone和YARN模式中。

  • Standalone模式资源分配

Spark应用程序按照FIFO顺序执行,每个应用程序都会尝试使用使用所有可用的节点。

通过设置内核数目可以限制Spark应用程序资源的使用情况。spark.deploy.defalutCores配置通用资源使用。单个应用程序特殊资源使用可以通过spark.cores.max修改配置

除了控制cpu cores之外,每个应用程序的spark.executor.memory配置可以控制每个Executor内存的使用。

  • YARN模式的资源分配

在Spark YARN提交的客户端(spark-submit)配置参数。--num-executors控制在集群上分配的Executors数量。--executor-memory控制Executor的内存分配。--executor-cores控制Executor的cpu分配。

  1. 动态资源分配

根据工作负载动态分配集群资源。在应用程序资源空闲时及时释放回集群,应用程序资源紧张时及时向集群申请空闲资源。这在集群中存在多个Spark应用程序同时运行时尤为重要。

使用动态资源分配时,必须要保存Executors在执行过程中写得shuffle文件,使Executors能够顺利释放,应用程序必须使用外部的shuffle服务。

配置外部shuffle服务的方式:

在Spark YARN集群模式下,配置spark.dynamicAllocation.enabled为true启用动态资源分配,并通过*.minExecutors*.maxExecutors提供Executors的上下限。

通过spark.shuffle.service.enabled为true启用外部shuffle服务,并在集群所有nodemanager上配置shuffle服务。 资源分配策略

Spark在资源空闲时释放Executors,在需要时重新获取Executors。由于没有一个明确的方法去预测一个将要释放资源的Executor是否将要运行一个新的任务,也没有办法预测当前的任务是否需要申请新增Executor,所以需要一组策略来决定什么时候释放或者申请Executors。

  • **请求策略 **

当一个能动态分配资源的Spark应用程序有任务等待分配时,能够请求额外的Executors,这种情况意味着现有Executors不足以完成所有已经提交但是尚未完成的Task。Spark以轮询的方式申请Executors资源。当任务等待的时间超过*.schedulerBacklogTimeout设定的时间,并且再次触发*.sustainedSchedulerBacklogTimeout延时之后,如果队列中处于等待的任务依然存在,申请额外Executors资源的请求就会被触发。而且每次轮询请求的Executors数目成倍增加。例如一个应用程序在第一次请求的时候会得到一个Executor,之后每轮会申请2,4,8,16……个Executors。

这样做的动机有两个。一是一个应用程序在开始的时候应该谨慎的申请资源,避免一次申请过多造成资源浪费;二是应用程序能够及时的提供其资源使用情况,避免资源浪费。

  • **删除策略 **

当Executor的空闲时间超过*.executorIdleTimeout设定的值时,一个Spark应用程序将释放此Executor。删除策略和请求策略是互斥的,因为如果有等待的Executor资源的任务存在,就不会存在空闲的Executor。

  • **Executors优雅的退出机制 **

资源动态分配之前,当Executor执行失败,或Executor相关联应用程序退出时,一个Spark的Executor会退出,在这两种场景中,与Executor相关联的所有状态都不需要,可以安全的丢弃。

但是,当一个Executor被显示删除时,应用程序仍然在运行。此时,应用程序试图访问Executor存储或写的状态,只能重新计算。为避免重新计算,需要在删除一个Executor之前保留状态。

例如在shuffle期间,Spark的Executor会将自己的Map本地输出到磁盘。对于一些比较慢的事件,任务运行的时间比其他的时间更久,动态分配可能会在shuffle完成之前将空间的Executor删除掉,因为已经将状态保存在磁盘中,就不需要再重新计算了。

保留shuffle文件需要使用外部Shuffle服务,这些服务是指长时间独立运行在集群每个节点的Spark应用程序以及Executors进程。启用该服务时,Spark的Executors将会从服务中获取shuffle文件以替代从备份中获取。这样任何一个Executor写得shuffle状态都可以超出Executor的生命周期继续服务。

除了保留shuffle文件,Executor还可以将数据缓存在内存或者硬盘中,但是当Executor被删除后,这些缓存的数据就不能被访问了。

应用程序中的调度

在一个Spark应用程序中,如果在不同的线程中,多个并行的Job可以同时运行。因为Spark的任务调度是线程安全的,同时支持应用程序使用Case服务多个请求(多用户查询)。

Spark默认按照FIFO的方式运行Jobs,每个Job会被划分成多个Stages,分配可用资源的时候第一个Job的优先级最高,如果运行中的Job不需要使用所有的集群资源,后面的Job就不需要等待资源,可以立即执行。

Spark调度还支持以轮询的方式让不同的Job共享集群资源,此种方式称为公平调度。这种方式下,所有Job获得一个大约相当的集群资源。

启用公平调度器的方式:

val conf = new SparkConf()
conf.set("spark.scheduler.mode","FAIR")

公平调度池

Spark的公平调度器参考Hadoop的公平调度器,支持Job分组进入调度池,每个调度池可以设置不同的调度参数,同一分组内部按照用户均等分配资源,而不是按照Job平均共享资源。

新提交的任务通过下面的方式配置调度池

val sc = new SparkContext()
sc.setLocalProperty("spark.scheduler.pool","poolname")

如果不设置,任务会进入一个默认池。

设置调度池的属性之后,该线程中,被提交的全部Jobs(RDDs的save、collect、count等)都使用该Pool的名称。这样使得一个线程运行同一个用户的多个Jobs变得容易。

清除一个线程关联的池,调用

sc.setLocalProperty("spark.scheduler.pool",null)

配置公平调度池属性

默认情况下,每个池获得相同份额的集群资源,但是每个池中,Jobs以FIFO的方式运行。

池属性可以通过配置文件进行修改。

  • schedulingMode,调度模式,FIFO或者FAIR
  • weight,权重,相对与其他池的资源获取情况,默认所有池的权重都为1。如果一个池的权重设置为2,将获得权重为1的池的2倍资源。设置高权重的池中的Job总是会优先执行。
  • minShare,最小共享。在按整体权重分配额外资源之前,公平调度器会为每个池分配的最小的共享资源(CPU核数)。默认每个池的minShare为0。

这些属性通过xml文件配置,默认文件类似于conf/fairscheduler.xml.template。要修改默认文件使用conf.set("spark.scheduler.allocation.file","path/fileName")

XML文件格式如下:

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

调度器

DAGScheduler负责构建具有依赖关系的任务集TaskSet,TaskScheduler负责将TaskSet资源提供给TaskSetManager供其作为调度任务的依据,TaskSetManager负责在具体任务集的内部调度任务。

每一个SparkContext可能同时存在多个可运行的任务集(没有依赖关系),这些任务集之间的调度是由调度池Pool对象决定的,调度池Pool管理的对象是下一级的Pool或者TaskSetManager对象。

调度池

TaskSchedulerImpl在初始化时会根据用户设定的SchedulingMode(默认为FIFO)创建一个rootPool根调度池,之后根据具体的调度模式再进一步创建SchedulableBuilder对象,具体的SchedulableBuilder对象中的BuildPool方法将在rootPool的基础上完成整个Pool的构建工作。

调度池是在SparkContext内部的调度,多个Spark应用程序之间在调度池层面是没有调度优先级关系的。

两种调度模式,对应了两种类型的Pool

  • FIFO:FIFO Pool直接管理TaskSetManager,每个TaskSetManager创建时都存储了其对应的StageID,FIFO Pool根据StageID的顺序调度TaskSetManager
  • FAIR:FAIR Pool直接管理的对象是下一级的Pool,或者TaskSetManager,公平调度的基本原则是根据所管理的Pool/TaskSetManager中正在运行的任务的数量判断优先级,用户可以设置minShare最小任务数和weight任务权重调整对应Pool中任务集的优先程度。公平调度模式下构建的调度池是两级结构,根调度池管理一组子调度池,子调度池进一步管理属于该调度池的TaskSetManager。

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,一毛也是爱

打开支付宝扫一扫,即可进行扫码打赏哦