Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,7 @@ private[spark] class TaskSchedulerImpl(
// addresses are the same as that we allocated in taskResourceAssignments since it's
// synchronized. We don't remove the exact addresses allocated because the current
// approach produces the identical result with less time complexity.
availableResources(i).getOrElse(rName,
throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should never happens since taskResAssignmentsOpt is non empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskResAssignmentsOpt isn't directly related to availableResources, can you explain more to help others review?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task.resources (L399) is from taskResAssignmentsOpt and taskResAssignmentsOpt is from availableResources(i) (see resourcesMeetTaskRequirements).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this shouldn't happen I think it was there as just in case a bug or someone changed something that would get caught here. I'm ok with this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this can be removed and simplified as below.

.remove(0, rInfo.addresses.size)
availableResources(i)(rName).remove(0, rInfo.addresses.size)
}
// Only update hosts for a barrier task.
if (taskSet.isBarrier) {
Expand Down Expand Up @@ -468,8 +466,8 @@ private[spark] class TaskSchedulerImpl(
resourceProfileIds: Array[Int],
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
rpId: Int): Int = {
val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
taskSet: TaskSet): Int = {
val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(taskSet.resourceProfileId)
val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason to change this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to pass the raid in anymore since its in the task set which we need now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but I was trying to make sense of whether it was relevant to the fix or not.
Looks like an unrelated cleanup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need taskSet: TaskSetManager now because we'll use it to abort the task set below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ! Yes, I knew I missed something :-)
Thanks

(id == resourceProfile.id)
}
Expand All @@ -484,9 +482,11 @@ private[spark] class TaskSchedulerImpl(
numTasksPerExecCores
} else {
val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount)
.getOrElse(throw new SparkException("limitingResource returns from ResourceProfile" +
s" $resourceProfile doesn't actually contain that task resource!")
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has the same problem cc @tgravescs

.getOrElse {
dagScheduler.taskSetFailed(taskSet, "limitingResource returns from ResourceProfile " +
s"$resourceProfile doesn't actually contain that task resource!", None)
return -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to throw the exception after the abort - else subsequent code will continue.
This is a repeated pattern in this PR - when replacing an exception/require/etc (which would have thrown an exception earlier) with a task set abort, please ensure that the exception continues to be thrown.

}
// available addresses already takes into account if there are fractional
// task resource requests
val availAddrs = availableResources(index).get(limitingResource).map(_.size).getOrElse(0)
Expand Down Expand Up @@ -582,7 +582,7 @@ private[spark] class TaskSchedulerImpl(
// value is -1
val numBarrierSlotsAvailable = if (taskSet.isBarrier) {
val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, availableResources,
taskSet.taskSet.resourceProfileId)
taskSet.taskSet)
slots
} else {
-1
Expand Down Expand Up @@ -675,11 +675,13 @@ private[spark] class TaskSchedulerImpl(
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
// requirements are not fulfilled, and we should revert the launched tasks).
require(addressesWithDescs.size == taskSet.numTasks,
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the primary issue causing the bug - and is overly strict.
Unfortunately, the way barrier scheduling has been written, the only way to get it working currently is to disable delay scheduling entirely.
We will need to relook how to get barrier scheduling working more gracefully with task locality and executor failures (and in future, dynamic allocation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead have a counter inside the taskSet or other mechanism to allow for X retries? It seems like turning it off bis a bit of a behaviour change from the point of view of considering backporting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proposal was to do that in a separate jira. https://issues.apache.org/jira/browse/SPARK-31509

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @holdenk.
require would have ended up throwing an exception in this case - we should do the same after taskSet.abort to prevent change in behavior - particularly for backport

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead have a counter inside the taskSet or other mechanism to allow for X retries?

I believe barrier retry is next step we plan to do in the future release but not 2.4.

It seems like turning it off bis a bit of a behaviour change from the point of view of considering backporting.

What's behavior change? Previously, application gets hang and now it fail as we expect in first place.

require would have ended up throwing an exception in this case - we should do the same after taskSet.abort to prevent change in behavior - particularly for backport

To be honest, I'm fine to keep throwing exception there, but I disagree that throwing exception is expected behavior we can not change. Actually, no one would handle the exception thrown here. And I believe our expect behavior is to fail the application with the clear error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 It is not whether there is user code is handling exception or not - but throwing exception stops the flow of code here. If exception is not thrown, rest of the code will execute - barrier initialization, flag updates, log messages, etc.
Some of it gets cleaned up by taskset abort, some of it results in confusing log messages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, make sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that throw exception isn't recommended way to stop code flow within the scope of rpc. The better way is to rewrite the code logic and gets every branch under control.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree !
I was referring specifically in context of backport - not throwing an exception results in behavior change, which we should not have.
Ideal solution is to fix this - the require was a precondition we were expecting to be satisfied for barrier scheduling - which gets violated when locality preference is present (which probably the original authors did not expect/consider).

if (addressesWithDescs.size != taskSet.numTasks) {
dagScheduler.taskSetFailed(taskSet.taskSet,
s"Fail current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because only ${addressesWithDescs.size} out of a total number of " +
s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
"been blacklisted or cannot fulfill task locality requirements.")
"been blacklisted or cannot fulfill task locality requirements.", None)
}

// materialize the barrier coordinator.
maybeInitBarrierCoordinator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY

class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {

def initLocalClusterSparkContext(): Unit = {
def initLocalClusterSparkContext(numWorker: Int = 4): Unit = {
val conf = new SparkConf()
// Init local cluster here so each barrier task runs in a separated process, thus `barrier()`
// call is actually useful.
.setMaster("local-cluster[4, 1, 1024]")
.setMaster(s"local-cluster[$numWorker, 1, 1024]")
.setAppName("test-cluster")
.set(TEST_NO_STAGE_RETRY, true)
sc = new SparkContext(conf)
Expand Down Expand Up @@ -276,4 +276,20 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
initLocalClusterSparkContext()
testBarrierTaskKilled(interruptOnKill = true)
}

test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems wrong to me, locality preference is causing failure? I thought we should be looking at all available slots and only erroring if you didn't have enough. Here you have 2 workers with 1 cores each so you should be able to fit 2 tasks on there eventually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems wrong to me, locality preference is causing failure?

Yeah, I have to say so. We do have enough slots in this case but it doesn't guarantee that all tasks in a task sets can be scheduled in a single round resource offer because of delay scheduling. While partial scheduling is ok for normal task set, it's unacceptable for barrier task set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok looking through some of the logic some more, I guess if total slots is less then it skips it higher up. This still seems very odd that you fail based on locality being set. Wouldn't you just want to ignore locality in this case? Dynamic allocation isn't on, if you have number slots = number of tasks you need to just schedule it.

Do we at least recommend people turning off locality when using barrier? I recommend shutting off current to most people anyway on yarn because it can have other bad issues. Do we have a jira for this issue, this seems like it could be very confusing to users. The message says something about blacklisted, so if I'm going to try to debug to figure out why my stuff isn't scheduled, I think this would be very hard to figure out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note my last comment was posted before I saw your response. I get that you don't have enough slots here, but you are failing the task set, in the case of locality if you just wait you would likely get it. Or like my last comment says we could recognize it, or in the very least add locality to the message for our failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The require(addressesWithDescs.size == taskSet.numTasks) there is more like a safe check for the barrier task set and prevents any possible reasons from partial tasks launching. Obviously, delay scheduling is one of reasons we can tell. I agree to improve error message against delay scheduling if it's also the only reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also cc @jiangxb1987

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can certainly do it separate from this since that appears to be existing behavior. I was surprised by it because I was thinking of other cases like blacklisting, which makes more sense to me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes more sense? Or, no? I think blacklist offers have already been filtered out at that point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 can you create a JIRA ticket? I think it's worth discussion to see if we should reject to schedule barrier tasks due to locality.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// scheduling. So, one of tasks won't be scheduled in one round of resource offer.
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), Seq("executor_h_0")))
val errorMsg = intercept[SparkException] {
rdd.barrier().mapPartitions { iter =>
BarrierTaskContext.get().barrier()
iter
}.collect()
}.getMessage
assert(errorMsg.contains("Fail current round of resource offers for barrier stage"))
}
}