Skip to content

2.3 任务划分和调度。

在Spark中,一个作业(Job)的执行是由一系列的阶段(Stage)和任务(Task)组成的。Spark根据DAG的依赖关系,将其划分为不同的阶段,每个阶段包含一组可以并行执行的任务。

这个过程主要涉及两个关键组件:DAGScheduler和TaskScheduler。

DAGScheduler

DAGScheduler负责将DAG划分为不同的阶段(Stage)。具体来说:

  1. DAGScheduler从最终的RDD出发,反向遍历DAG。
  2. 对于每个RDD,DAGScheduler检查其依赖关系:
    • 如果是窄依赖(Narrow Dependency),表示父RDD的一个分区只被子RDD的一个分区使用,数据可以在同一个阶段(Stage)内流动。
    • 如果是宽依赖(Wide Dependency),表示父RDD的一个分区被子RDD的多个分区使用,需要进行Shuffle操作,数据跨越不同的阶段(Stage)流动。
  3. DAGScheduler以宽依赖为边界,将DAG划分为多个阶段(Stage)。每个阶段包含尽可能多的窄依赖操作,而阶段之间以宽依赖(Shuffle)相连。

通过这种划分策略,Spark可以最大限度地在每个阶段内实现数据的流水线计算,减少不必要的中间结果存储和Shuffle操作。

TaskScheduler

一旦DAGScheduler完成了阶段的划分,TaskScheduler就开始介入,负责将每个阶段划分为具体的任务(Task),并调度它们在集群的执行器(Executor)上运行。

  1. 对于每个阶段,TaskScheduler根据该阶段的最后一个RDD的分区数,确定需要生成的任务(Task)数量。
  2. TaskScheduler为每个任务(Task)确定其运行所需的输入数据分区。
  3. TaskScheduler考虑数据的本地性(Data Locality),尽可能将任务(Task)调度到存储了其所需输入数据的节点上,减少数据的网络传输。
  4. TaskScheduler将生成的任务(Task)以TaskSet的形式提交给集群管理器(Cluster Manager),请求执行资源。
  5. 集群管理器根据集群的资源状况,将任务(Task)分发给可用的执行器(Executor)运行。

通过这种任务划分和调度策略,Spark可以实现任务(Task)的细粒度并行,充分利用集群的计算资源,提高作业的执行效率。

同时,TaskScheduler还具有一些高级的调度功能,如任务推测执行(Speculative Execution)、任务重试(Task Retry)等,以应对执行器(Executor)的异常和性能不均衡问题。

理解Spark的任务划分和调度机制,对于优化Spark作业的并行度和资源利用率至关重要。通过合理配置并行度,调整任务的粒度,以及控制Shuffle操作,我们可以显著提升Spark作业的性能。

在下一节中,我们将进一步探讨任务(Task)在执行器(Executor)上的实际执行和计算过程。如果你对任务划分和调度有任何疑问或想法,欢迎随时提出!

DAGScheduler划分Stage的例子

好的,我来给你举一个简单但能说明DAGScheduler划分Stage的例子。

假设我们有以下的Spark代码:

scala
val data = sc.textFile("input.txt")
val words = data.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 > 10)
filtered.saveAsTextFile("output")

这段代码的作用是:

  1. 从文件"input.txt"中读取文本数据。
  2. 将文本数据按空格拆分为单词。
  3. 对每个单词计数,并统计每个单词的出现次数。
  4. 过滤出出现次数大于10的单词。
  5. 将结果保存到文件"output"中。

对于这个例子,DAGScheduler会将DAG划分为以下几个Stage:

  • Stage 1: 读取文件并将文本数据拆分为单词。

    • 对应的操作是textFile和flatMap。
    • 这个Stage的输出是一个包含所有单词的RDD。
  • Stage 2: 对单词进行计数并统计每个单词的出现次数。

    • 对应的操作是map和reduceByKey。
    • 这个Stage的输入是Stage 1的输出,即包含所有单词的RDD。
    • 这个Stage的输出是一个包含单词及其出现次数的RDD。
  • Stage 3: 过滤出出现次数大于10的单词并保存结果。

    • 对应的操作是filter和saveAsTextFile。
    • 这个Stage的输入是Stage 2的输出,即包含单词及其出现次数的RDD。

DAGScheduler之所以这样划分Stage,是基于以下原则:

  1. 每个Stage的边界都是一个shuffle操作(如reduceByKey)或者是一个action操作(如saveAsTextFile)。
  2. 前一个Stage的输出是后一个Stage的输入。
  3. 每个Stage内部的操作可以串行执行,而Stage之间的操作需要shuffle数据,因此必须切断。

通过这种划分,Spark可以最大化每个Stage内部的数据本地性和并行度,同时最小化Stage之间的数据传输和依赖关系,从而优化整个作业的执行效率。

这个例子虽然简单,但很好地说明了DAGScheduler划分Stage的基本原则和优化目标。在实际的Spark作业中,DAG可能会更加复杂,包含更多的操作和依赖关系,但划分Stage的基本思路是一致的。

TaskScheduler任务划分例子

好的,我来给你举一个具体的例子来说明TaskScheduler是如何进行任务划分的。

假设我们有以下的Spark代码:

scala
val data = sc.parallelize(1 to 100, 4)
val result = data.map(x => x * 2).filter(x => x > 50).reduce((x, y) => x + y)

这段代码的作用是:

  1. 创建一个包含1到100的RDD,并将其分为4个分区。
  2. 对每个元素进行乘以2的操作。
  3. 过滤出大于50的元素。
  4. 对过滤后的元素进行求和。

现在,让我们看看TaskScheduler是如何划分任务的:

  1. DAGScheduler将作业划分为两个Stage:

    • Stage 1: 包括map和filter操作。
    • Stage 2: 包括reduce操作。
  2. TaskScheduler根据Stage信息和分区数创建任务:

    • Stage 1有4个分区,因此会创建4个任务(Task):

      • Task 1: 处理分区0的数据,对1到25进行乘以2和过滤操作。
      • Task 2: 处理分区1的数据,对26到50进行乘以2和过滤操作。
      • Task 3: 处理分区2的数据,对51到75进行乘以2和过滤操作。
      • Task 4: 处理分区3的数据,对76到100进行乘以2和过滤操作。
    • Stage 2有1个任务,因为reduce操作需要聚合所有分区的数据:

      • Task 5: 对Stage 1的输出数据进行求和操作。
  3. TaskScheduler将任务分发到集群的Executor上执行:

    • Task 1到Task 4可以并行执行,因为它们处理的是不同分区的数据。
    • Task 5依赖于Task 1到Task 4的输出结果,因此需要等待它们全部完成后才能开始执行。
  4. Executor执行任务并将结果返回给Driver:

    • Task 1到Task 4的执行结果是过滤后的数据,它们将结果发送回Driver。
    • Task 5对接收到的数据进行求和,并将最终结果返回给Driver。

通过这个例子,我们可以看到TaskScheduler是如何将DAGScheduler划分的Stage转化为具体的任务,并将任务分发到集群的Executor上执行的。TaskScheduler负责任务的创建、调度和执行,确保每个任务都能获得所需的资源并高效地完成计算。

在实际的Spark作业中,RDD的分区数、数据分布和操作复杂度都会影响TaskScheduler的任务划分和调度策略。TaskScheduler会根据集群的资源情况、数据本地性原则和负载均衡等因素来优化任务的分配和执行,以达到最佳的性能表现。

宽依赖和窄依赖

好的,我来用具体的例子解释宽依赖和窄依赖。

假设我们有一个数据集,其中包含了学生的姓名和成绩信息,格式如下:

("Alice", 85), ("Bob", 92), ("Charlie", 78), ("David", 88), ("Eve", 95)

现在我们要对这个数据集进行一系列的转换和操作。

  1. 窄依赖示例:
scala
val students = sc.parallelize(Array(
  ("Alice", 85), ("Bob", 92), ("Charlie", 78), ("David", 88), ("Eve", 95)
))

val passedStudents = students.filter(_._2 >= 60)

在这个例子中,我们使用filter操作筛选出成绩大于等于60的学生。filter操作是一个窄依赖,因为每个输入分区的数据只会影响到对应的输出分区,不会依赖其他分区的数据。这意味着,passedStudents RDD的每个分区只依赖于students RDD相应的分区,不存在跨分区的数据依赖。

  1. 宽依赖示例:
scala
val students = sc.parallelize(Array(
  ("Alice", 85), ("Bob", 92), ("Charlie", 78), ("David", 88), ("Eve", 95)
))

val totalScore = students.map(_._2).reduce(_ + _)

在这个例子中,我们使用map操作提取出所有学生的成绩,然后使用reduce操作计算成绩的总和。reduce操作是一个宽依赖,因为它需要跨分区收集数据并进行聚合计算。这意味着,totalScore的计算依赖于students RDD的所有分区的数据。

宽依赖会导致shuffle操作的发生。在这个例子中,为了计算总成绩,Spark需要将所有分区的成绩数据进行重新分区,并将它们发送到同一个分区中进行聚合计算。这个过程涉及数据的跨节点传输和网络I/O,因此比窄依赖的计算开销要大。

总结:

  • 窄依赖: 每个输出分区只依赖于对应的输入分区,不存在跨分区的数据依赖。典型的窄依赖操作包括mapfilterunion等。
  • 宽依赖: 输出分区依赖于多个输入分区,存在跨分区的数据依赖。典型的宽依赖操作包括groupByKeyreduceByKeyjoin等,这些操作都会触发shuffle。

了解宽依赖和窄依赖的区别对于优化Spark作业非常重要。我们应该尽量避免不必要的宽依赖操作,因为它们会导致shuffle,增加数据传输和磁盘I/O的开销。相反,应该优先使用窄依赖操作,以便最大限度地利用数据本地性和并行性,提高作业的执行效率。