diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 540d44f409..cbdb0bdad1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -757,18 +757,22 @@ private void logProgress() { } } - public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host, + public void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host, boolean readError, boolean connectError) { failedShuffleCounter.increment(1); inputContext.notifyProgress(); - int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier()); + int failures; - if (!fetchFailure.isLocalFetch()) { - /** - * Track the number of failures that has happened since last completion. - * This gets reset on a successful copy. - */ - failedShufflesSinceLastCompletion++; + synchronized (this) { + failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier()); + + if (!fetchFailure.isLocalFetch()) { + /** + * Track the number of failures that has happened since last completion. + * This gets reset on a successful copy. + */ + failedShufflesSinceLastCompletion++; + } } /** @@ -822,7 +826,7 @@ private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) { return false; } - private void penalizeHost(MapHost host, int failures) { + private synchronized void penalizeHost(MapHost host, int failures) { host.penalize(); HostPort hostPort = new HostPort(host.getHost(), host.getPort()); @@ -842,7 +846,7 @@ private void penalizeHost(MapHost host, int failures) { penalties.add(new Penalty(host, penaltyDelay)); } - private int getFailureCount(InputAttemptIdentifier srcAttempt) { + private synchronized int getFailureCount(InputAttemptIdentifier srcAttempt) { IntWritable failureCount = failureCounts.get(srcAttempt); return (failureCount == null) ? 0 : failureCount.get(); } @@ -1015,51 +1019,63 @@ boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) { final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction; final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction; - int doneMaps = numInputs - remainingMaps.get(); + String errorMsg = null; + boolean result; - String logContext = "srcAttempt=" + srcAttempt.toString(); - boolean fetcherHealthy = isFetcherHealthy(logContext); - - // check if the reducer has progressed enough - boolean reducerProgressedEnough = - (((float)doneMaps / numInputs) - >= MIN_REQUIRED_PROGRESS_PERCENT); - - // check if the reducer is stalled for a long time - // duration for which the reducer is stalled - int stallDuration = - (int)(System.currentTimeMillis() - lastProgressTime); - - // duration for which the reducer ran with progress - int shuffleProgressDuration = - (int)(lastProgressTime - startTime); - - boolean reducerStalled = (shuffleProgressDuration > 0) && - (((float)stallDuration / shuffleProgressDuration) - >= MAX_ALLOWED_STALL_TIME_PERCENT); - - // kill if not healthy and has insufficient progress - if ((failureCounts.size() >= maxFailedUniqueFetches || - failureCounts.size() == (numInputs - doneMaps)) - && !fetcherHealthy - && (!reducerProgressedEnough || reducerStalled)) { - String errorMsg = (srcNameTrimmed + ": " - + "Shuffle failed with too many fetch failures and insufficient progress: " - + "[failureCounts=" + failureCounts.size() - + ", pendingInputs=" + (numInputs - doneMaps) - + ", fetcherHealthy=" + fetcherHealthy - + ", reducerProgressedEnough=" + reducerProgressedEnough - + ", reducerStalled=" + reducerStalled) - + "]"; - LOG.error(errorMsg); - if (LOG.isDebugEnabled()) { - LOG.debug("Host failures=" + hostFailures.keySet()); + synchronized (this) { + + int doneMaps = numInputs - remainingMaps.get(); + + String logContext = "srcAttempt=" + srcAttempt.toString(); + boolean fetcherHealthy = isFetcherHealthy(logContext); + + // check if the reducer has progressed enough + boolean reducerProgressedEnough = + (((float)doneMaps / numInputs) + >= MIN_REQUIRED_PROGRESS_PERCENT); + + // check if the reducer is stalled for a long time + // duration for which the reducer is stalled + int stallDuration = + (int)(System.currentTimeMillis() - lastProgressTime); + + // duration for which the reducer ran with progress + int shuffleProgressDuration = + (int)(lastProgressTime - startTime); + + boolean reducerStalled = (shuffleProgressDuration > 0) && + (((float)stallDuration / shuffleProgressDuration) + >= MAX_ALLOWED_STALL_TIME_PERCENT); + + // kill if not healthy and has insufficient progress + if ((failureCounts.size() >= maxFailedUniqueFetches || + failureCounts.size() == (numInputs - doneMaps)) + && !fetcherHealthy + && (!reducerProgressedEnough || reducerStalled)) { + errorMsg = (srcNameTrimmed + ": " + + "Shuffle failed with too many fetch failures and insufficient progress: " + + "[failureCounts=" + failureCounts.size() + + ", pendingInputs=" + (numInputs - doneMaps) + + ", fetcherHealthy=" + fetcherHealthy + + ", reducerProgressedEnough=" + reducerProgressedEnough + + ", reducerStalled=" + reducerStalled) + + "]"; + LOG.error(errorMsg); + if (LOG.isDebugEnabled()) { + LOG.debug("Host failures=" + hostFailures.keySet()); + } + result = false; + } else { + result = true; } + } + + if (!result) { // Shuffle knows how to deal with failures post shutdown via the onFailure hook + // reportException() should be called outside synchronized(this) exceptionReporter.reportException(new IOException(errorMsg)); - return false; } - return true; + return result; } public synchronized void addKnownMapOutput(String inputHostName,