Skip to content

Commit ae14f36

Browse files
tmagrinozsxwing
authored andcommitted
[SPARK-16148][SCHEDULER] Allow for underscores in TaskLocation in the Executor ID
## What changes were proposed in this pull request? Previously, the TaskLocation implementation would not allow for executor ids which include underscores. This tweaks the string split used to get the hostname and executor id, allowing for underscores in the executor id. This addresses the JIRA found here: https://issues.apache.org/jira/browse/SPARK-16148 This is moved over from a previous PR against branch-1.6: apache#13857 ## How was this patch tested? Ran existing unit tests for core and streaming. Manually ran a simple streaming job with an executor whose id contained underscores and confirmed that the job ran successfully. This is my original work and I license the work to the project under the project's open source license. Author: Tom Magrino <tmagrino@fb.com> Closes apache#13858 from tmagrino/fixtasklocation.
1 parent d59ba8e commit ae14f36

2 files changed

Lines changed: 9 additions & 7 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,18 @@ private[spark] object TaskLocation {
6464

6565
/**
6666
* Create a TaskLocation from a string returned by getPreferredLocations.
67-
* These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the
68-
* location is cached.
67+
* These strings have the form executor_[hostname]_[executorid], [hostname], or
68+
* hdfs_cache_[hostname], depending on whether the location is cached.
6969
*/
7070
def apply(str: String): TaskLocation = {
7171
val hstr = str.stripPrefix(inMemoryLocationTag)
7272
if (hstr.equals(str)) {
7373
if (str.startsWith(executorLocationTag)) {
74-
val splits = str.split("_")
75-
if (splits.length != 3) {
76-
throw new IllegalArgumentException("Illegal executor location format: " + str)
77-
}
78-
new ExecutorCacheTaskLocation(splits(1), splits(2))
74+
val hostAndExecutorId = str.stripPrefix(executorLocationTag)
75+
val splits = hostAndExecutorId.split("_", 2)
76+
require(splits.length == 2, "Illegal executor location format: " + str)
77+
val Array(host, executorId) = splits
78+
new ExecutorCacheTaskLocation(host, executorId)
7979
} else {
8080
new HostTaskLocation(str)
8181
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
802802
assert(TaskLocation("host1") === HostTaskLocation("host1"))
803803
assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
804804
assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
805+
assert(TaskLocation("executor_some.host1_executor_task_3") ===
806+
ExecutorCacheTaskLocation("some.host1", "executor_task_3"))
805807
}
806808

807809
test("Kill other task attempts when one attempt belonging to the same task succeeds") {

0 commit comments

Comments
 (0)