Skip to content

Commit e22caed

Browse files
committed
change locality wait map from resetOnPreviousOffer to noRejectsSinceLastReset
1 parent f0e2fc3 commit e22caed

File tree

2 files changed

+75
-10
lines changed

2 files changed

+75
-10
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ private[spark] class TaskSchedulerImpl(
111111
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
112112

113113
// keyed by taskset
114-
// value is true if the task set's locality wait timer was reset on the last resource offer
115-
private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]()
114+
// value is true if the task set has not rejected any resources due to locality
115+
// since the timer was last reset
116+
private val noRejectsSinceLastReset = new mutable.HashMap[TaskSet, Boolean]()
116117
private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET)
117118

118119
// Protected by `this`
@@ -336,7 +337,7 @@ private[spark] class TaskSchedulerImpl(
336337
taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
337338
}
338339
}
339-
resetOnPreviousOffer -= manager.taskSet
340+
noRejectsSinceLastReset -= manager.taskSet
340341
manager.parent.removeSchedulable(manager)
341342
logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
342343
s" ${manager.parent.name}")
@@ -615,13 +616,14 @@ private[spark] class TaskSchedulerImpl(
615616
}
616617

617618
if (!legacyLocalityWaitReset) {
618-
if (noDelaySchedulingRejects && launchedAnyTask) {
619-
if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) {
619+
if (noDelaySchedulingRejects) {
620+
if (launchedAnyTask &&
621+
(isAllFreeResources || noRejectsSinceLastReset.getOrElse(taskSet.taskSet, true))) {
620622
taskSet.resetDelayScheduleTimer(globalMinLocality)
621-
resetOnPreviousOffer.update(taskSet.taskSet, true)
623+
noRejectsSinceLastReset.update(taskSet.taskSet, true)
622624
}
623625
} else {
624-
resetOnPreviousOffer.update(taskSet.taskSet, false)
626+
noRejectsSinceLastReset.update(taskSet.taskSet, false)
625627
}
626628
}
627629

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,67 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
296296
.flatten.isEmpty)
297297
}
298298

299+
test("SPARK-18886 - task set with no locality requirements should not starve one with them") {
300+
val clock = new ManualClock()
301+
// All tasks created here are local to exec1, host1.
302+
// Locality level starts at PROCESS_LOCAL.
303+
val taskScheduler = setupTaskSchedulerForLocalityTests(clock)
304+
// Locality levels increase at 3000 ms.
305+
val advanceAmount = 2000
306+
307+
val taskSet2 = FakeTask.createTaskSet(8, 2, 0)
308+
taskScheduler.submitTasks(taskSet2)
309+
310+
// Stage 2 takes resource since it has no locality requirements
311+
assert(taskScheduler
312+
.resourceOffers(
313+
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
314+
isAllFreeResources = false)
315+
.flatten
316+
.headOption
317+
.map(_.name)
318+
.getOrElse("")
319+
.contains("stage 2.0"))
320+
321+
// Clock advances to 2s. No locality changes yet.
322+
clock.advance(advanceAmount)
323+
324+
// Stage 2 takes resource since it has no locality requirements
325+
assert(taskScheduler
326+
.resourceOffers(
327+
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
328+
isAllFreeResources = false)
329+
.flatten
330+
.headOption
331+
.map(_.name)
332+
.getOrElse("")
333+
.contains("stage 2.0"))
334+
335+
// Simulates:
336+
// 1. stage 2 has taken all resource offers through single resource offers
337+
// 2. stage 1 is offered 0 cpus on allResourceOffer.
338+
// This should not reset timer.
339+
assert(taskScheduler
340+
.resourceOffers(
341+
IndexedSeq(WorkerOffer("exec2", "host1", 0)),
342+
isAllFreeResources = true)
343+
.flatten.length === 0)
344+
345+
// This should move stage 1 to NODE_LOCAL.
346+
clock.advance(advanceAmount)
347+
348+
// Stage 1 should now accept NODE_LOCAL resource.
349+
assert(taskScheduler
350+
.resourceOffers(
351+
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
352+
isAllFreeResources = false)
353+
.flatten
354+
.headOption
355+
.map(_.name)
356+
.getOrElse("")
357+
.contains("stage 1.1"))
358+
}
359+
299360
test("SPARK-18886 - partial resource offers (isAllFreeResources = false) reset " +
300361
"time if last full resource offer (isAllResources = true) was accepted as well as any " +
301362
"following partial resource offers") {
@@ -306,12 +367,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
306367
// Locality levels increase at 3000 ms.
307368
val advanceAmount = 3000
308369

309-
// PROCESS_LOCAL full resource offer is accepted.
370+
// PROCESS_LOCAL full resource offer is not rejected due to locality.
371+
// It has 0 available cores, so no task is launched.
372+
// Timer is reset and locality level remains at PROCESS_LOCAL.
310373
assert(taskScheduler
311374
.resourceOffers(
312-
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
375+
IndexedSeq(WorkerOffer("exec1", "host1", 0)),
313376
isAllFreeResources = true)
314-
.flatten.length === 1)
377+
.flatten.length === 0)
315378

316379
// Advancing clock increases locality level to NODE_LOCAL.
317380
clock.advance(advanceAmount)

0 commit comments

Comments
 (0)