Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
import scala.util.Random

import org.apache.spark._
Expand Down Expand Up @@ -96,6 +96,9 @@ private[spark] class TaskSchedulerImpl(
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
val taskIdToExecutorId = new HashMap[Long, String]

// Protected by `this`
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]

@volatile private var hasReceivedTask = false
@volatile private var hasLaunchedTask = false
private val starvationTimer = new Timer(true)
Expand Down Expand Up @@ -297,6 +300,7 @@ private[spark] class TaskSchedulerImpl(
taskSetsForStage -= manager.taskSet.stageAttemptId
if (taskSetsForStage.isEmpty) {
taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
stageIdToFinishedPartitions -= manager.taskSet.stageId

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add this code when calling killTasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find method killTasks in TaskSchedulerImpl, and for some similar func, e.g. cancelTasks, I think it's unnecessary.

}
}
manager.parent.removeSchedulable(manager)
Expand Down Expand Up @@ -837,19 +841,29 @@ private[spark] class TaskSchedulerImpl(
}

/**
* Marks the task has completed in all TaskSetManagers for the given stage.
* Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage.
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
* And there is also the possibility that the DAGScheduler submits another taskset at the same
* time as we're marking a task completed here -- that taskset would have a task for a partition
* that was already completed. We maintain the set of finished partitions in
* stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
* is submitted. See SPARK-25250 for more details.
*
* note: this method must be called with a lock on this.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(
stageId: Int,
partitionId: Int,
taskInfo: TaskInfo) = {
val finishedPartitions =
stageIdToFinishedPartitions.getOrElseUpdate(stageId, new BitSet)
finishedPartitions += partitionId
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
tsm.markPartitionCompleted(partitionId, taskInfo)
tsm.markPartitionCompleted(partitionId, Some(taskInfo))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
import scala.math.max
import scala.util.control.NonFatal

Expand Down Expand Up @@ -189,6 +189,18 @@ private[spark] class TaskSetManager(
addPendingTask(i)
}

{
// TaskSet got submitted by DAGScheduler may have some already completed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do it in TaskSchedulerImpl.createTaskSetManager? In general I'd avoid adding too much code to the constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I have the same thought. Let me try it.

// tasks since DAGScheduler does not always know all the tasks that have
// been completed by other tasksets when completing a stage, so we mark
// those tasks as finished here to avoid launching duplicate tasks, while
// holding the TaskSchedulerImpl lock.
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()`
sched.stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
finishedPartitions => finishedPartitions.foreach(markPartitionCompleted(_, None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we guarantee that there is no race condition between the creation of TaskSetManager and the updating of TaskSchedulerImpl. stageIdToFinishedPartitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we always hold lock on TaskSchedulerImpl while accessing stageIdToFinishedPartitions:

  • TaskSchedulerImpl.submitTasks

  • TaskSchedulerImpl.handleSuccessfulTask

  • TaskSchedulerImpl.taskSetFinished

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, TaskSetManager is always created at * TaskSchedulerImpl.submitTasks

}
}

/**
* Track the set of locality levels which are valid given the tasks locality preferences and
* the set of currently available executors. This is updated as executors are added and removed.
Expand Down Expand Up @@ -797,11 +809,12 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}

private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: Option[TaskInfo])
: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: I'm pretty sure that even if just the return type goes over the line, our style is to switch to multi-line definition

partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled && !isZombie) {
successfulTaskDurations.insert(taskInfo.duration)
taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is existing logic, but I have a question here: Do we really need to do it? The task is finished by another TSM, it seems unreasonable to update the statistics for launching speculative tasks in this TSM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. What's your opinion ? @squito

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there was discussion about this in the past, there are arguments for doing it multiple ways. This was kind of a compromise with something that avoided a bug and was a reasonable change to put in, see more here: #21656

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

}
tasksSuccessful += 1
successful(index) = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
}

test("Completions in zombie tasksets update status of non-zombie taskset") {
test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
val valueSer = SparkEnv.get.serializer.newInstance()

Expand All @@ -1114,9 +1114,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}

// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
// two times, so we have three active task sets for one stage. (For this to really happen,
// you'd need the previous stage to also get restarted, and then succeed, in between each
// attempt, but that happens outside what we're mocking here.)
// two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
// to really happen, you'd need the previous stage to also get restarted, and then succeed,
// in between each attempt, but that happens outside what we're mocking here.)
val zombieAttempts = (0 until 2).map { stageAttempt =>
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
taskScheduler.submitTasks(attempt)
Expand All @@ -1133,30 +1133,51 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(tsm.runningTasks === 9)
tsm
}
// we've now got 2 zombie attempts, each with 9 tasks still active but zero active attempt
// in taskScheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't understand "zero active attempt in taskScheduler", can you explain what you mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I mean "there's no active attempt in taskScheduler", updated.


// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
// And it's possible since the behaviour of submitting new attempt and handling successful task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possible since ...

// is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
// separately.
(0 until 2).foreach { i =>
completeTaskSuccessfully(zombieAttempts(i), i + 1)
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
}

// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
// the stage, but this time with insufficient resources so not all tasks are active.

// Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
// "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
// already completed tasks. And this time with insufficient resources so not all tasks are
// active.
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
taskScheduler.submitTasks(finalAttempt)
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
// Though, finalTsm gets submitted after some tasks succeeds, but it could also know about the
// finished partition by looking into `stageIdToFinishedPartitions` when it is being created,
// so that it won't launch any duplicate tasks later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd reword this to

Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should realize that 2 tasks have already completed, and mark them appropriately, so it won't launch any duplicate tasks later (SPARK-25250).

(0 until 2).map(_ + 1).foreach { partitionId =>
val index = finalTsm.partitionToIndex(partitionId)
assert(finalTsm.successful(index))
}

val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
finalAttempt.tasks(task.index).partitionId
}.toSet
assert(finalTsm.runningTasks === 5)
assert(!finalTsm.isZombie)

// We simulate late completions from our zombie tasksets, corresponding to all the pending
// partitions in our final attempt. This means we're only waiting on the tasks we've already
// launched.
// We continually simulate late completions from our zombie tasksets(but this time, there's one
// active attempt exists in taskScheduler), corresponding to all the pending partitions in our
// final attempt. This means we're only waiting on the tasks we've already launched.
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
finalAttemptPendingPartitions.foreach { partition =>
completeTaskSuccessfully(zombieAttempts(0), partition)
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
}

// If there is another resource offer, we shouldn't run anything. Though our final attempt
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
// remaining tasks to compute are already active in the non-zombie attempt.
assert(
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
Expand All @@ -1179,6 +1200,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
zombieAttempts(partition % 2)
}
completeTaskSuccessfully(tsm, partition)
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
}

assert(finalTsm.isZombie)
Expand All @@ -1204,6 +1226,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// perspective, as the failures weren't from a problem w/ the tasks themselves.
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), any())
}
assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
}

test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {
Expand Down