Skip to content

Commit 52725af

Browse files
committed
Remove assert in SparkContext.killExecutors
1 parent 5bedcb8 commit 52725af

2 files changed

Lines changed: 19 additions & 15 deletions

File tree

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
8383
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
8484
s"${now - lastSeenMs} ms exceeds timeout $executorTimeout ms")
8585
scheduler.executorLost(executorId, SlaveLost())
86-
sc.killExecutor(executorId)
86+
if(sc.supportKillExecutor()) {
87+
sc.killExecutor(executorId)
88+
}
8789
executorLastSeen.remove(executorId)
8890
}
8991
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
379379
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
380380
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
381381
if (dynamicAllocationEnabled) {
382-
assert(master.contains("yarn") || dynamicAllocationTesting,
382+
assert(supportKillExecutor(),
383383
"Dynamic allocation of executors is currently only supported in YARN mode")
384384
Some(new ExecutorAllocationManager(this, listenerBus, conf))
385385
} else {
@@ -1034,6 +1034,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10341034
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
10351035
postEnvironmentUpdate()
10361036
}
1037+
1038+
def supportKillExecutor(): Boolean = {
1039+
if(master.contains("yarn") || dynamicAllocationTesting) {
1040+
true
1041+
}
1042+
false
1043+
}
10371044

10381045
/**
10391046
* :: DeveloperApi ::
@@ -1051,8 +1058,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10511058
*/
10521059
@DeveloperApi
10531060
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
1054-
assert(master.contains("yarn") || dynamicAllocationTesting,
1055-
"Requesting executors is currently only supported in YARN mode")
1061+
assert(supportKillExecutor(), "Requesting executors is currently only supported in YARN mode")
10561062
schedulerBackend match {
10571063
case b: CoarseGrainedSchedulerBackend =>
10581064
b.requestExecutors(numAdditionalExecutors)
@@ -1069,17 +1075,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10691075
*/
10701076
@DeveloperApi
10711077
override def killExecutors(executorIds: Seq[String]): Boolean = {
1072-
if (master.contains("yarn") || dynamicAllocationTesting) {
1073-
schedulerBackend match {
1074-
case b: CoarseGrainedSchedulerBackend =>
1075-
b.killExecutors(executorIds)
1076-
case _ =>
1077-
logWarning("Killing executors is only supported in coarse-grained mode")
1078-
false
1079-
}
1080-
} else {
1081-
logWarning("Killing executors is currently only supported in YARN mode")
1082-
false
1078+
assert(supportKillExecutor(), "Killing executors is currently only supported in YARN mode")
1079+
schedulerBackend match {
1080+
case b: CoarseGrainedSchedulerBackend =>
1081+
b.killExecutors(executorIds)
1082+
case _ =>
1083+
logWarning("Killing executors is only supported in coarse-grained mode")
1084+
false
10831085
}
10841086
}
10851087

0 commit comments

Comments
 (0)