Spark Job Execution

划分 Stage

当某个 Action 操作触发计算,向 DAGScheduler 提交作业时,DAGScheduler 需要从 RDD 依赖链的末端出发,遍历整个 RDD 依赖链,划分 Stage,并且决定 Stage 之间的依赖关系。

Stage 划分是以 Shuffle 依赖为依据,当某个 RDD 运算需要将数据进行 Shuffle 时,这个包含了 Shuffle 依赖关系的 RDD 将被用来作为输入信息,构建一个新的 Stage,由此为依据划分 Stage,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。

有帮助的截图

以GroupBy为例,该操作的结果实际上是一个ShuffleRDD(上图中的B),当DAGScheduler遍历到这个ShuffleRDD时,因为其依赖是一个ShuffleDependency,于是这个ShuffleRDD的父RDD以及ShuffleDependency等对象构建一个新的Stage,此Stage的输出结果的分区方式,是由ShuffleDependency中的Partitioner对象决定的。

其中ShuffleRDD本身的运算操作(其实就是一个获取Shuffle结果的过程),是在下一个Stage里进行的。 

生成Job,提交Stage

划分Stage步骤得到一个或多个有依赖关系的Stage,其中,直接触发Job的RDD所关联的Stage作为finalStage生成一个Job实例,这两者的关系进一步存储在resultStageToJob映射表中,用在该Stage全部完成时做一些后续处理,如报告状态、清理Job相关数据等。

具体提交一个Stage时,首先判读该Stage所依赖的父Stage的结果是否可用,如果所有父Stage的结果都可用,则提交该Stage,如果有任何一个Stage不可用,则迭代尝试提交父Stage。所有迭代过程中由于所依赖Stage的结果不可用而没有提交成功的Stage都被放到waitingStages列表中等待将来被提交。

当一个属于中间过程Stage的任务(这种类型的任务所对应的类为ShuffleMapTask)完成以后,DAGScheduler会检查对应的Stage的所有任务是否都完成了,若完成,DAGScheduler会扫描waitingStages队列,检查他们是否还有任何依赖的Stage没有完成,如果没有就可以提交该Stage。此外,每当完成一次DAGScheduler的事件循环以后,也会触发一次从等待和失败列表中扫面并提交就绪Stage的调用过程。

任务集的提交

每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交TaskSet,这个TaskSet最终会触发TaskScheduler构建一个TaskManager的实例来管理这个TaskSet的生命周期,而对于DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源时,进一步通过TaskSetManager调度具体的Task到对应的Executor节点上进行运算。

任务作业完成状态的监控

要保证相互依赖的Job/Stage能够顺利的调度执行,DAGScheduler就必然需要监控当前Job/Stage乃至Task的完成情况。这是通过对外(主要是对TaskScheduler)暴露一系列的回调函数来实现的,对于TaskScheduler来说,这些回调函数主要包括任务集的开始、结束、失败,任务集的失败,DAGScheduler根据这些Task的生命周期信息进一步维护Job和Stage的状态信息。

此外,TaskScheduler还可以通过回调函数通知DAGScheduler具体的Executor的生命状态,如果某一个Executor崩溃,或者由于任何原因与Driver失去联系,则对应Stage的ShuffleMapTask的输出结果也被标志位不可用,这将导致对应Stage状态的变更,进而影响相关Job的状态,再进一步可能触发对应Stage的重新提交来重新计算获取相关数据。

任务结果的获取

一个具体任务在Executor中执行完毕以后,其结果需要以某种形式返回给DAGScheduler,根据任务类型的不同,任务结果的返回方式也不同。FinalStage所对应的任务(对应的类为ResultTask)返回给DAGScheduler的是运算结果本身,而ShuffleMapTask返回给DAGScheduler的是一个MapStatus对象,这个对象管理了ShuffleMapTask的运算输出结果在BlockManager里的相关存储信息,而非结果本身。这些存储位置信息将作为下一个Stage任务获取输入数据的依据。而根据任务结果大小的不同,ResultTask返回的结果又分为两类,如果结果足够小,则直接放在DirectTaskResult对象内,如果超过特定尺寸(默认约为10M)则在Executor端会将DirectTaskResult先序列化,再把序列化的结果作为一个Block存放在BlockManager内,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给TaskScheduler,TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的BlockID取出并通过BlockManager最终取得对应的DirectTaskResult。当然从DAGScheduler的角度来说,这些过程对它来说是透明的,它所获得的都是任务的实际结果。

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦