Skip to content

Commit 1cb42e6

Browse files
committed
Properly handle job failure when the job gets killed.
1 parent cbc48be commit 1cb42e6

1 file changed

Lines changed: 18 additions & 10 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -335,33 +335,41 @@ class DAGScheduler(
335335
listener.awaitResult() // Will throw an exception if the job fails
336336
}
337337

338-
def killJob(jobId: Int) {
338+
def killJob(jobId: Int): Unit = this.synchronized {
339339
activeJobs.find(job => job.jobId == jobId).foreach(job => killJob(job))
340340
}
341341

342-
private def killJob(job: ActiveJob) {
342+
private def killJob(job: ActiveJob): Unit = this.synchronized {
343343
logInfo("Killing Job and cleaning up stages %d".format(job.jobId))
344344
activeJobs.remove(job)
345345
idToActiveJob.remove(job.jobId)
346346
val stage = job.finalStage
347347
resultStageToJob.remove(stage)
348-
killStage(stage)
349-
// recursively remove all parent stages
350-
stage.parents.foreach(p => killStage(p))
351-
job.listener.jobFailed(new SparkException("Job killed"))
348+
killStage(job, stage)
349+
val e = new SparkException("Job killed")
350+
job.listener.jobFailed(e)
351+
listenerBus.post(SparkListenerJobEnd(job, JobFailed(e, None)))
352352
}
353353

354-
private def killStage(stage: Stage) {
354+
private def killStage(job: ActiveJob, stage: Stage): Unit = this.synchronized {
355+
// TODO: Can we reuse taskSetFailed?
355356
logInfo("Killing Stage %s".format(stage.id))
356357
stageIdToStage.remove(stage.id)
357358
if (stage.isShuffleMap) {
358359
shuffleToMapStage.remove(stage.id)
359360
}
360361
waiting.remove(stage)
361362
pendingTasks.remove(stage)
362-
running.remove(stage)
363363
taskSched.killTasks(stage.id)
364-
stage.parents.foreach(p => killStage(p))
364+
365+
if (running.contains(stage)) {
366+
running.remove(stage)
367+
val e = new SparkException("Job killed")
368+
listenerBus.post(SparkListenerJobEnd(job, JobFailed(e, Some(stage))))
369+
}
370+
371+
stage.parents.foreach(parentStage => killStage(job, parentStage))
372+
//stageToInfos -= stage
365373
}
366374

367375
/**
@@ -785,7 +793,7 @@ class DAGScheduler(
785793

786794
/**
787795
* Aborts all jobs depending on a particular Stage. This is called in response to a task set
788-
* being cancelled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
796+
* being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
789797
*/
790798
private def abortStage(failedStage: Stage, reason: String) {
791799
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq

0 commit comments

Comments
 (0)