Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 37 additions & 37 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{HashMap, HashSet, Stack}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Stack}
import scala.concurrent.duration._
import scala.language.existentials
import scala.language.postfixOps
Expand Down Expand Up @@ -403,32 +403,47 @@ class DAGScheduler(
parents.toList
}

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
/**
* Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet.
* This is done in topological order to create ancestor stages first to ensure that the result
* stage graph is correctly built.
*/
private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = {
val parents = new ArrayBuffer[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}
case _ =>
val deps = r.dependencies.filter {
case shufDep: ShuffleDependency[_, _, _] =>
!shuffleToMapStage.contains(shufDep.shuffleId)
case _ => true
}
if (deps.forall(dep => visited(dep.rdd))) {
waitingForVisit.pop()
visited += r
for (dep <- deps) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
parents += shufDep
case _ =>
}
}
} else {
for (dep <- deps if !visited(dep.rdd)) {
waitingForVisit.push(dep.rdd)
}
waitingForVisit.push(dep.rdd)
}
} else {
waitingForVisit.pop()
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cuts down on the repeated scanning of data structures and should increase performance a little more (only lightly tested):

    def visit(r: RDD[_]) {
      if (visited(r)) {
        waitingForVisit.pop()
      } else {
        val visitedShuffleDeps = new ArrayBuffer[ShuffleDependency[_, _, _]]
        val unvisitedDeps = new ArrayBuffer[Dependency[_]]

        r.dependencies.foreach {
          case dep: ShuffleDependency[_, _, _] if !shuffleToMapStage.contains(dep.shuffleId) =>
            if (visited(dep.rdd)) visitedShuffleDeps += dep
            else unvisitedDeps += dep
          case dep if !visited(dep.rdd) => unvisitedDeps += dep
          case _ =>
        }

        if (unvisitedDeps.isEmpty) {
          waitingForVisit.pop()
          visited += r
          for (shufDep <- visitedShuffleDeps) { parents += shufDep }
        } else {
          for (dep <- unvisitedDeps) {
            waitingForVisit.push(dep.rdd)
          }
        }
      }
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra Thank you, I'll pick it.

waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
visit(waitingForVisit.top)
}
parents
}
Expand Down Expand Up @@ -724,7 +739,6 @@ class DAGScheduler(
reason = "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
submitWaitingStages()
}

/**
Expand All @@ -750,23 +764,20 @@ class DAGScheduler(
submitStage(stage)
}
}
submitWaitingStages()
}

/**
* Check for waiting stages which are now eligible for resubmission.
* Ordinarily run on every iteration of the event loop.
* Ordinarily run after the parent stage completed successfully.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this to say something like:

"Submits stages that depend on the given parent stage. Called when the parent stage completes successfully."

*/
private def submitWaitingStages() {
// TODO: We might want to run this less often, when we are sure that something has become
// runnable that wasn't before.
private def submitWaitingChildStages(parent: Stage) {
logTrace("Checking for newly runnable parent stages")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this to say s"Checking if any dependencies of $parent are now runnable"

logTrace("running: " + runningStages)
logTrace("waiting: " + waitingStages)
logTrace("failed: " + failedStages)
val waitingStagesCopy = waitingStages.toArray
waitingStages.clear()
for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
waitingStages --= childStages
for (stage <- childStages.sortBy(_.firstJobId)) {
submitStage(stage)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems submitWaitingChildStages is called to submit child stages when the given parent stage is available. From this observation, do we have to re-check missing parents inside submitStage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and the re-check is done in the submitStage().
If there are some missing parent stages, the child will go to waitingStages again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahah, I see.

}
}
Expand All @@ -791,23 +802,20 @@ class DAGScheduler(
}
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
}

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
submitWaitingStages()
}

private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
exception: Option[Throwable]): Unit = {
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) }
submitWaitingStages()
}

private[scheduler] def cleanUpAfterSchedulerStop() {
Expand All @@ -830,7 +838,6 @@ class DAGScheduler(

private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
submitWaitingStages()
}

private[scheduler] def handleJobSubmitted(jobId: Int,
Expand Down Expand Up @@ -869,8 +876,6 @@ class DAGScheduler(
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)

submitWaitingStages()
}

private[scheduler] def handleMapStageSubmitted(jobId: Int,
Expand Down Expand Up @@ -914,8 +919,6 @@ class DAGScheduler(
if (finalStage.isAvailable) {
markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
}

submitWaitingStages()
}

/** Submits stage, but first recursively submits any missing parents. */
Expand Down Expand Up @@ -1078,6 +1081,8 @@ class DAGScheduler(
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)

submitWaitingChildStages(stage)
}
}

Expand Down Expand Up @@ -1247,7 +1252,7 @@ class DAGScheduler(
}
}

// Note: newly runnable stages will be submitted below when we submit waiting stages
submitWaitingChildStages(shuffleStage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done when !shuffleStage.isAvailable and we have resubmitted the shuffleStage, or only within the else branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra Thank you for your review.
Definitely we can move this into else branch.
I'll modify it.

}
}

Expand Down Expand Up @@ -1322,7 +1327,6 @@ class DAGScheduler(
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
submitWaitingStages()
}

/**
Expand Down Expand Up @@ -1364,7 +1368,6 @@ class DAGScheduler(
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to submit some newly-waiting stages here (e.g., if shuffle output was lost for a map stage, so now that map stage needs to be re-run)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be a non-issue, because we handle lost shuffle output separately, when we get a FetchFailure from a task that tries to fetch the output.

submitWaitingStages()
}

private[scheduler] def handleExecutorAdded(execId: String, host: String) {
Expand All @@ -1373,7 +1376,6 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
submitWaitingStages()
}

private[scheduler] def handleStageCancellation(stageId: Int) {
Expand All @@ -1386,7 +1388,6 @@ class DAGScheduler(
case None =>
logInfo("No active jobs to kill for Stage " + stageId)
}
submitWaitingStages()
}

private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
Expand All @@ -1396,7 +1397,6 @@ class DAGScheduler(
failJobAndIndependentStages(
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
}
submitWaitingStages()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,68 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
}

/**
* This test ensures that DAGScheduler build stage graph correctly.
* Here, we submit an RDD[F] having a linage of RDDs as follows:
*
* <--------------------
* / \
* [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F]
* \ /
* <--------------------
*
* then check if all stages have correct parent stages.
* Note: [] means an RDD, () means a shuffle dependency.
*/
test("parent stages") {
val rddA = new MyRDD(sc, 1, Nil)

val shuffleDef1 = new ShuffleDependency(rddA, new HashPartitioner(1))
val rddB = new MyRDD(sc, 1, List(shuffleDef1), tracker = mapOutputTracker)

val shuffleDef2 = new ShuffleDependency(rddB, new HashPartitioner(1))
val rddC = new MyRDD(sc, 1, List(shuffleDef2), tracker = mapOutputTracker)

val shuffleDef3 = new ShuffleDependency(rddC, new HashPartitioner(1))
val rddD = new MyRDD(sc, 1, List(shuffleDef3, new OneToOneDependency(rddB)),
tracker = mapOutputTracker)

val shuffleDef4 = new ShuffleDependency(rddD, new HashPartitioner(1))
val rddE = new MyRDD(sc, 1, List(new OneToOneDependency(rddC), shuffleDef4),
tracker = mapOutputTracker)

val shuffleDef5 = new ShuffleDependency(rddE, new HashPartitioner(1))
val rddF = new MyRDD(sc, 1, List(shuffleDef5),
tracker = mapOutputTracker)
submit(rddF, Array(0))

assert(scheduler.shuffleToMapStage.size === 5)
assert(scheduler.activeJobs.size === 1)

val mapStage1 = scheduler.shuffleToMapStage(shuffleDef1.shuffleId)
val mapStage2 = scheduler.shuffleToMapStage(shuffleDef2.shuffleId)
val mapStage3 = scheduler.shuffleToMapStage(shuffleDef3.shuffleId)
val mapStage4 = scheduler.shuffleToMapStage(shuffleDef4.shuffleId)
val mapStage5 = scheduler.shuffleToMapStage(shuffleDef5.shuffleId)
val finalStage = scheduler.activeJobs.head.finalStage

assert(mapStage1.parents.isEmpty)
assert(mapStage2.parents === List(mapStage1))
assert(mapStage3.parents === List(mapStage2))
assert(mapStage4.parents === List(mapStage1, mapStage3))
assert(mapStage5.parents === List(mapStage2, mapStage4))
assert(finalStage.parents === List(mapStage5))

complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(5), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
}

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
Expand Down