Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Apr 19, 2020

What changes were proposed in this pull request?

Use dagScheduler.taskSetFailed to abort a barrier stage instead of throwing exception within resourceOffers.

Why are the changes needed?

Any non fatal exception thrown within Spark RPC framework can be swallowed:

private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
try action catch {
case NonFatal(e) =>
try endpoint.onError(e) catch {
case NonFatal(ee) =>
if (stopped) {
logDebug("Ignoring error", ee)
} else {
logError("Ignoring error", ee)
}

The method TaskSchedulerImpl.resourceOffers is also within the scope of Spark RPC framework. Thus, throw exception inside resourceOffers won't fail the application.

As a result, if a barrier stage fail the require check at require(addressesWithDescs.size == taskSet.numTasks, ...), the barrier stage will fail the check again and again util all tasks from TaskSetManager dequeued. But since the barrier stage isn't really executed, the application will hang.

The issue can be reproduced by the following test:

initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>
  BarrierTaskContext.get().barrier()
  iter
}.collect()

Does this PR introduce any user-facing change?

Yes, application hang previously but fail-fast after this fix.

How was this patch tested?

Added a regression test.

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 19, 2020

ping @jiangxb1987 @cloud-fan please take a look, thanks!

val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount)
.getOrElse(throw new SparkException("limitingResource returns from ResourceProfile" +
s" $resourceProfile doesn't actually contain that task resource!")
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has the same problem cc @tgravescs

// synchronized. We don't remove the exact addresses allocated because the current
// approach produces the identical result with less time complexity.
availableResources(i).getOrElse(rName,
throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should never happens since taskResAssignmentsOpt is non empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskResAssignmentsOpt isn't directly related to availableResources, can you explain more to help others review?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task.resources (L399) is from taskResAssignmentsOpt and taskResAssignmentsOpt is from availableResources(i) (see resourcesMeetTaskRequirements).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this shouldn't happen I think it was there as just in case a bug or someone changed something that would get caught here. I'm ok with this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this can be removed and simplified as below.

@SparkQA
Copy link

SparkQA commented Apr 19, 2020

Test build #121475 has finished for PR 28257 at commit 29be3f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems wrong to me, locality preference is causing failure? I thought we should be looking at all available slots and only erroring if you didn't have enough. Here you have 2 workers with 1 cores each so you should be able to fit 2 tasks on there eventually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems wrong to me, locality preference is causing failure?

Yeah, I have to say so. We do have enough slots in this case but it doesn't guarantee that all tasks in a task sets can be scheduled in a single round resource offer because of delay scheduling. While partial scheduling is ok for normal task set, it's unacceptable for barrier task set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok looking through some of the logic some more, I guess if total slots is less then it skips it higher up. This still seems very odd that you fail based on locality being set. Wouldn't you just want to ignore locality in this case? Dynamic allocation isn't on, if you have number slots = number of tasks you need to just schedule it.

Do we at least recommend people turning off locality when using barrier? I recommend shutting off current to most people anyway on yarn because it can have other bad issues. Do we have a jira for this issue, this seems like it could be very confusing to users. The message says something about blacklisted, so if I'm going to try to debug to figure out why my stuff isn't scheduled, I think this would be very hard to figure out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note my last comment was posted before I saw your response. I get that you don't have enough slots here, but you are failing the task set, in the case of locality if you just wait you would likely get it. Or like my last comment says we could recognize it, or in the very least add locality to the message for our failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The require(addressesWithDescs.size == taskSet.numTasks) there is more like a safe check for the barrier task set and prevents any possible reasons from partial tasks launching. Obviously, delay scheduling is one of reasons we can tell. I agree to improve error message against delay scheduling if it's also the only reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also cc @jiangxb1987

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can certainly do it separate from this since that appears to be existing behavior. I was surprised by it because I was thinking of other cases like blacklisting, which makes more sense to me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes more sense? Or, no? I think blacklist offers have already been filtered out at that point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 can you create a JIRA ticket? I think it's worth discussion to see if we should reject to schedule barrier tasks due to locality.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is another place we throw in statusUpdate that should be replaced.

// synchronized. We don't remove the exact addresses allocated because the current
// approach produces the identical result with less time complexity.
availableResources(i).getOrElse(rName,
throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this shouldn't happen I think it was there as just in case a bug or someone changed something that would get caught here. I'm ok with this

@SparkQA
Copy link

SparkQA commented Apr 21, 2020

Test build #121591 has finished for PR 28257 at commit 911f792.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// synchronized. We don't remove the exact addresses allocated because the current
// approach produces the identical result with less time complexity.
availableResources(i).getOrElse(rName,
throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this can be removed and simplified as below.

taskSet: TaskSetManager): Int = {
val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(
taskSet.taskSet.resourceProfileId)
val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason to change this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to pass the raid in anymore since its in the task set which we need now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but I was trying to make sense of whether it was relevant to the fix or not.
Looks like an unrelated cleanup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need taskSet: TaskSetManager now because we'll use it to abort the task set below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ! Yes, I knew I missed something :-)
Thanks

.getOrElse {
taskSet.abort("limitingResource returns from ResourceProfile " +
s"$resourceProfile doesn't actually contain that task resource!")
return -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to throw the exception after the abort - else subsequent code will continue.
This is a repeated pattern in this PR - when replacing an exception/require/etc (which would have thrown an exception earlier) with a task set abort, please ensure that the exception continues to be thrown.

s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
"been blacklisted or cannot fulfill task locality requirements.")
"been blacklisted or cannot fulfill task locality requirements."
logError(errorMsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: logWarning

// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
// requirements are not fulfilled, and we should revert the launched tasks).
require(addressesWithDescs.size == taskSet.numTasks,
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the primary issue causing the bug - and is overly strict.
Unfortunately, the way barrier scheduling has been written, the only way to get it working currently is to disable delay scheduling entirely.
We will need to relook how to get barrier scheduling working more gracefully with task locality and executor failures (and in future, dynamic allocation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead have a counter inside the taskSet or other mechanism to allow for X retries? It seems like turning it off bis a bit of a behaviour change from the point of view of considering backporting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proposal was to do that in a separate jira. https://issues.apache.org/jira/browse/SPARK-31509

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @holdenk.
require would have ended up throwing an exception in this case - we should do the same after taskSet.abort to prevent change in behavior - particularly for backport

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead have a counter inside the taskSet or other mechanism to allow for X retries?

I believe barrier retry is next step we plan to do in the future release but not 2.4.

It seems like turning it off bis a bit of a behaviour change from the point of view of considering backporting.

What's behavior change? Previously, application gets hang and now it fail as we expect in first place.

require would have ended up throwing an exception in this case - we should do the same after taskSet.abort to prevent change in behavior - particularly for backport

To be honest, I'm fine to keep throwing exception there, but I disagree that throwing exception is expected behavior we can not change. Actually, no one would handle the exception thrown here. And I believe our expect behavior is to fail the application with the clear error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 It is not whether there is user code is handling exception or not - but throwing exception stops the flow of code here. If exception is not thrown, rest of the code will execute - barrier initialization, flag updates, log messages, etc.
Some of it gets cleaned up by taskset abort, some of it results in confusing log messages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, make sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that throw exception isn't recommended way to stop code flow within the scope of rpc. The better way is to rewrite the code logic and gets every branch under control.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree !
I was referring specifically in context of backport - not throwing an exception results in behavior change, which we should not have.
Ideal solution is to fix this - the require was a precondition we were expecting to be satisfied for barrier scheduling - which gets violated when locality preference is present (which probably the original authors did not expect/consider).

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 22, 2020

Since we all think that we should disable delay scheduling with barrier now, I'm going to take SPARK-31509 (which requires updating error message only) into this PR for simplify.

@SparkQA
Copy link

SparkQA commented Apr 22, 2020

Test build #121632 has finished for PR 28257 at commit 92497c9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val errorMsg =
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"
taskSet.abort(errorMsg)
throw new SparkException(errorMsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception change is fine here ? +CC @tgravescs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw exception should be fine. But having another think, I'm afraid that it might lead to the following developers fall into the same trap in the future. They may not be aware of taskSet.abort but only do exception throw.

Copy link
Member Author

@Ngone51 Ngone51 Apr 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add comments to TaskSchedulerImpl as precaution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that should be fine, as it will just be caught and then logged.

@SparkQA
Copy link

SparkQA commented Apr 23, 2020

Test build #121645 has finished for PR 28257 at commit 6495a9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 23, 2020

BTW, I personally think this fix should backport to 2.4. What's your opinion? @mridulm @tgravescs

@SparkQA
Copy link

SparkQA commented Apr 23, 2020

Test build #121690 has finished for PR 28257 at commit ac9015d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

I'm fine with back porting to spark 2.4 for the barrier pieces

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes look good to me, @mridulm any other comments

@mridulm
Copy link
Contributor

mridulm commented Apr 23, 2020

Looks good to me Tom, thanks for fixing this @Ngone51 !

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 24, 2020

thanks, merging to master!

@cloud-fan cloud-fan closed this in 263f04d Apr 24, 2020
@cloud-fan
Copy link
Contributor

@Ngone51 can you send a backport PR for 3.0 and 2.4?

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 24, 2020

Sure, thanks all!!

Ngone51 added a commit to Ngone51/spark that referenced this pull request Apr 27, 2020
…sks launched

Use `dagScheduler.taskSetFailed` to abort a barrier stage instead of throwing exception within `resourceOffers`.

Any non fatal exception thrown within Spark RPC framework can be swallowed:

https://github.com/apache/spark/blob/100fc58da54e026cda87832a10e2d06eaeccdf87/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202-L211

 The method `TaskSchedulerImpl.resourceOffers` is also within the scope of Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail the application.

 As a result, if a barrier stage fail the require check at `require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage will fail the check again and again util all tasks from `TaskSetManager` dequeued.   But since the barrier stage isn't really executed, the application will hang.

The issue can be reproduced by the following test:

```scala
initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>
  BarrierTaskContext.get().barrier()
  iter
}.collect()
```

Yes, application hang previously but fail-fast after this fix.

Added a regression test.

Closes apache#28257 from Ngone51/fix_barrier_abort.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Ngone51 added a commit to Ngone51/spark that referenced this pull request Apr 27, 2020
…sks launched

Use `dagScheduler.taskSetFailed` to abort a barrier stage instead of throwing exception within `resourceOffers`.

Any non fatal exception thrown within Spark RPC framework can be swallowed:

https://github.com/apache/spark/blob/100fc58da54e026cda87832a10e2d06eaeccdf87/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202-L211

 The method `TaskSchedulerImpl.resourceOffers` is also within the scope of Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail the application.

 As a result, if a barrier stage fail the require check at `require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage will fail the check again and again util all tasks from `TaskSetManager` dequeued.   But since the barrier stage isn't really executed, the application will hang.

The issue can be reproduced by the following test:

```scala
initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>
  BarrierTaskContext.get().barrier()
  iter
}.collect()
```

Yes, application hang previously but fail-fast after this fix.

Added a regression test.

Closes apache#28257 from Ngone51/fix_barrier_abort.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants