Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1476,44 +1476,46 @@ private CookieContainerRequest getMatchingRequestWithoutPriority(
return null;
}
CookieContainerRequest firstMatch = null;
for (Collection<CookieContainerRequest> requests : pRequestsList) {
for (CookieContainerRequest cookieContainerRequest : requests) {
if (firstMatch == null || // we dont have a match. So look for one
// we have a match but are looking for a better container level match.
// skip the expensive canAssignTaskToContainer() if the request is
// not affinitized to the container
container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
) {
if (canAssignTaskToContainer(cookieContainerRequest, container)) {
// request matched to container
if (!considerContainerAffinity) {
return cookieContainerRequest;
}
ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
boolean canMatchTaskWithAffinity = true;
if (affCId == null ||
!heldContainers.containsKey(affCId) ||
inUseContainers.contains(affCId)) {
// affinity not specified
// affinitized container is no longer held
// affinitized container is in use
canMatchTaskWithAffinity = false;
}
if (canMatchTaskWithAffinity) {
if (container.getId().equals(
cookieContainerRequest.getAffinitizedContainer())) {
// container level match
LOG.debug("Matching with affinity for request: {} container: {}",
cookieContainerRequest, affCId);
synchronized (this) {
for (Collection<CookieContainerRequest> requests : pRequestsList) {
for (CookieContainerRequest cookieContainerRequest : requests) {
if (firstMatch == null || // we dont have a match. So look for one
// we have a match but are looking for a better container level match.
// skip the expensive canAssignTaskToContainer() if the request is
// not affinitized to the container
container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
) {
if (canAssignTaskToContainer(cookieContainerRequest, container)) {
// request matched to container
if (!considerContainerAffinity) {
return cookieContainerRequest;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping request for container " + container.getId()
+ " due to affinity. Request: " + cookieContainerRequest
+ " affContainer: " + affCId);
ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
boolean canMatchTaskWithAffinity = true;
if (affCId == null ||
!heldContainers.containsKey(affCId) ||
inUseContainers.contains(affCId)) {
// affinity not specified
// affinitized container is no longer held
// affinitized container is in use
canMatchTaskWithAffinity = false;
}
if (canMatchTaskWithAffinity) {
if (container.getId().equals(
cookieContainerRequest.getAffinitizedContainer())) {
// container level match
LOG.debug("Matching with affinity for request: {} container: {}",
cookieContainerRequest, affCId);
return cookieContainerRequest;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping request for container " + container.getId()
+ " due to affinity. Request: " + cookieContainerRequest
+ " affContainer: " + affCId);
}
} else {
firstMatch = cookieContainerRequest;
}
} else {
firstMatch = cookieContainerRequest;
}
}
}
Expand Down Expand Up @@ -1633,7 +1635,7 @@ private void pushNewContainerToDelayed(List<Container> containers){
delayedContainerManager.triggerScheduling(false);
}

private CookieContainerRequest removeTaskRequest(Object task) {
private synchronized CookieContainerRequest removeTaskRequest(Object task) {
CookieContainerRequest request = taskRequests.remove(task);
if(request != null) {
// remove all references of the request from AMRMClient
Expand All @@ -1642,7 +1644,7 @@ private CookieContainerRequest removeTaskRequest(Object task) {
return request;
}

private void addTaskRequest(Object task,
private synchronized void addTaskRequest(Object task,
CookieContainerRequest request) {
CookieContainerRequest oldRequest = taskRequests.put(task, request);
if (oldRequest != null) {
Expand Down