diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 6f51566f75..340cd45a1e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -1476,44 +1476,46 @@ private CookieContainerRequest getMatchingRequestWithoutPriority( return null; } CookieContainerRequest firstMatch = null; - for (Collection requests : pRequestsList) { - for (CookieContainerRequest cookieContainerRequest : requests) { - if (firstMatch == null || // we dont have a match. So look for one - // we have a match but are looking for a better container level match. - // skip the expensive canAssignTaskToContainer() if the request is - // not affinitized to the container - container.getId().equals(cookieContainerRequest.getAffinitizedContainer()) - ) { - if (canAssignTaskToContainer(cookieContainerRequest, container)) { - // request matched to container - if (!considerContainerAffinity) { - return cookieContainerRequest; - } - ContainerId affCId = cookieContainerRequest.getAffinitizedContainer(); - boolean canMatchTaskWithAffinity = true; - if (affCId == null || - !heldContainers.containsKey(affCId) || - inUseContainers.contains(affCId)) { - // affinity not specified - // affinitized container is no longer held - // affinitized container is in use - canMatchTaskWithAffinity = false; - } - if (canMatchTaskWithAffinity) { - if (container.getId().equals( - cookieContainerRequest.getAffinitizedContainer())) { - // container level match - LOG.debug("Matching with affinity for request: {} container: {}", - cookieContainerRequest, affCId); + synchronized (this) { + for (Collection requests : pRequestsList) { + for (CookieContainerRequest cookieContainerRequest : requests) { + if (firstMatch == null || // we dont have a match. So look for one + // we have a match but are looking for a better container level match. + // skip the expensive canAssignTaskToContainer() if the request is + // not affinitized to the container + container.getId().equals(cookieContainerRequest.getAffinitizedContainer()) + ) { + if (canAssignTaskToContainer(cookieContainerRequest, container)) { + // request matched to container + if (!considerContainerAffinity) { return cookieContainerRequest; } - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping request for container " + container.getId() - + " due to affinity. Request: " + cookieContainerRequest - + " affContainer: " + affCId); + ContainerId affCId = cookieContainerRequest.getAffinitizedContainer(); + boolean canMatchTaskWithAffinity = true; + if (affCId == null || + !heldContainers.containsKey(affCId) || + inUseContainers.contains(affCId)) { + // affinity not specified + // affinitized container is no longer held + // affinitized container is in use + canMatchTaskWithAffinity = false; + } + if (canMatchTaskWithAffinity) { + if (container.getId().equals( + cookieContainerRequest.getAffinitizedContainer())) { + // container level match + LOG.debug("Matching with affinity for request: {} container: {}", + cookieContainerRequest, affCId); + return cookieContainerRequest; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping request for container " + container.getId() + + " due to affinity. Request: " + cookieContainerRequest + + " affContainer: " + affCId); + } + } else { + firstMatch = cookieContainerRequest; } - } else { - firstMatch = cookieContainerRequest; } } } @@ -1633,7 +1635,7 @@ private void pushNewContainerToDelayed(List containers){ delayedContainerManager.triggerScheduling(false); } - private CookieContainerRequest removeTaskRequest(Object task) { + private synchronized CookieContainerRequest removeTaskRequest(Object task) { CookieContainerRequest request = taskRequests.remove(task); if(request != null) { // remove all references of the request from AMRMClient @@ -1642,7 +1644,7 @@ private CookieContainerRequest removeTaskRequest(Object task) { return request; } - private void addTaskRequest(Object task, + private synchronized void addTaskRequest(Object task, CookieContainerRequest request) { CookieContainerRequest oldRequest = taskRequests.put(task, request); if (oldRequest != null) {