Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,18 @@ private[yarn] class YarnAllocator(
location: String,
containersToUse: ArrayBuffer[Container],
remaining: ArrayBuffer[Container]): Unit = {
// SPARK-6050: certain Yarn configurations return a resource configuration that doesn't match
// the request; for example, capacity scheduler + DefaultResourceCalculator. Allow users in
// those situations to disable resource matching.
val matchingResource =
if (sparkConf.getBoolean("spark.yarn.container.matchAnyResource", false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like the config is named backwards? If I want to match any then I want to use allocatedContainer.getResource.

Perhaps matchExactResource or keep the name and switch what you get. I would expect to match any by default so its backwards compatible for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also as @mridulm mentioned we are basically just matching what we got back which disables all checks. Before we were checking that memory was atleast big enough.

private def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (executorMemory + memoryOverhead)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, no, if you want to match any you want to use the resource field, not the value returned by yarn (allocatedContainer.getResource). That means the comparison will effectively be resource == resource which will always be true.

I can change the config name if you think that would be clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: memory, can yarn really allocate a container with less resources than you asked for?

The resource in this class is pretty much static during the Spark app's lifetime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm still not seeing it. resource is:

Resource.newInstance(executorMemory + memoryOverhead, executorCores)

which is going to be executorCores passed in, which would be for instance 8 if I request 8.

That resource is then passed to amClient.getMatchingRequests which is going to find requests that have 8 vcores, which isn't what we want because the RM without cpu scheduling returns ones with 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at the latest version to see if it's any clearer.

But the gist is that amClient.getMatchingResources() is matching the resources you asked for (which is resource) against the parameter you're passing. What the option controls is whether you're passing resource also as the resource to match against - so basically, the exact same structure that is already in the outstanding list of requests.

So I think you're reading the condition backwards. Or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry I see now. I was reading it backwards and thinking it was matching what was actually allocated.

So I guess the question is whether we want the default of this to be true so that its backwards compatible. Otherwise the behavior changes for anyone running now that upgrades.

resource
} else {
allocatedContainer.getResource
}

val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
allocatedContainer.getResource)
matchingResource)

// Match the allocation to a request
if (!matchingRequests.isEmpty) {
Expand All @@ -318,7 +328,7 @@ private[yarn] class YarnAllocator(
assert(container.getResource.getMemory >= resource.getMemory)

logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
executorIdToContainer(executorId) = container
executorIdToContainer(executorId) = container

val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
Expand Down