Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage.
*
* Here's a checklist to use when making or reviewing changes to this class:
*
* - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
* include the new structure. This will help to catch memory leaks.
*/
private[spark]
class DAGScheduler(
Expand Down Expand Up @@ -110,6 +114,8 @@ class DAGScheduler(
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]

private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

// A closure serializer that we reuse.
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
Expand All @@ -127,8 +133,6 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val outputCommitCoordinator = env.outputCommitCoordinator

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
Expand Down Expand Up @@ -711,6 +715,7 @@ class DAGScheduler(
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
runningStages.foreach { stage =>
stage.latestInfo.stageFailed(stageFailedMessage)
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
Expand Down Expand Up @@ -980,6 +985,7 @@ class DAGScheduler(
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
}
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
}
Expand Down Expand Up @@ -1264,6 +1270,7 @@ class DAGScheduler(
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
stage.latestInfo.stageFailed(failureReason)
outputCommitCoordinator.stageEnd(stage.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to put these three lines (at least) in a separate method:

private def endStage(stage: Stage): Unit = {
  stage.latestInfo.stageFailed(failureReason)
  outputCommitCoordinator.stageEnd(stage.id)
  listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}

That would make it harder to miss things like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually looks like handleTaskCompletion has a nested markStageAsFinished method that looks like it should do this. That method also removes the stage from runningStages, which doesn't appear to happen in all of the paths where we post SparkListenerStageCompleted. Let me take a look at this and see whether there's a safe way to refactor things to use this method.

listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
} catch {
case e: UnsupportedOperationException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]

/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
*/
def isEmpty: Boolean = {
authorizedCommittersByStage.isEmpty
}

/**
* Called by tasks to ask whether they can commit their output to HDFS.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(scheduler.runningStages.isEmpty)
assert(scheduler.shuffleToMapStage.isEmpty)
assert(scheduler.waitingStages.isEmpty)
assert(scheduler.outputCommitCoordinator.isEmpty)
}

// Nothing in this test should break if the task info's fields are null, but
Expand Down