Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, HashSet}
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -1564,6 +1564,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

def killTasks(tasks: HashSet[Long], taskInfo: HashMap[Long, TaskInfo]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not suitable to add a public method here in SparkContext, SparkContext is a public entry point, any method adds to here should be considered carefully. In your case looks like only Spark internally will use this method, why not directly change the TaskSetManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jerryshao, Thanks for your prompt. I will move the method to TaskSetManager.

tasks.foreach { task =>
val executorId = taskInfo(task).executorId
schedulerBackend.killTask(task, executorId, true)
}
true
}

/** The version of Spark on which this application is running. */
def version: String = SPARK_VERSION

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,11 @@ private[spark] class TaskSetManager(
}
}
maybeFinishTaskSet()

// kill running task if stage failed
if(reason.isInstanceOf[FetchFailed]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A space between if and (.

sched.sc.killTasks(runningTasksSet, taskInfos)
}
}

def abort(message: String, exception: Option[Throwable] = None): Unit = sched.synchronized {
Expand Down