@@ -759,20 +759,22 @@ private void logProgress() {
759759 }
760760 }
761761
762- public synchronized void copyFailed (InputAttemptFetchFailure fetchFailure , MapHost host ,
762+ public void copyFailed (InputAttemptFetchFailure fetchFailure , MapHost host ,
763763 boolean readError , boolean connectError ) {
764764 failedShuffleCounter .increment (1 );
765765 inputContext .notifyProgress ();
766- int failures = incrementAndGetFailureAttempt ( fetchFailure . getInputAttemptIdentifier ()) ;
766+ int failures ;
767767
768- if (!fetchFailure .isLocalFetch ()) {
769- /**
770- * Track the number of failures that has happened since last completion.
771- * This gets reset on a successful copy.
772- */
773- failedShufflesSinceLastCompletion ++;
768+ synchronized (this ) {
769+ failures = incrementAndGetFailureAttempt (fetchFailure .getInputAttemptIdentifier ());
770+ if (!fetchFailure .isLocalFetch ()) {
771+ /**
772+ * Track the number of failures that has happened since last completion.
773+ * This gets reset on a successful copy.
774+ */
775+ failedShufflesSinceLastCompletion ++;
776+ }
774777 }
775-
776778 /**
777779 * Inform AM:
778780 * - In case of read/connect error
@@ -794,14 +796,18 @@ public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHo
794796 }
795797
796798 //Restart consumer in case shuffle is not healthy
797- if (!isShuffleHealthy (fetchFailure )) {
799+ try {
800+ checkShuffleHealthy (fetchFailure );
801+ } catch (IOException e ) {
802+ // reportException should be called outside synchronized(this) due to TEZ-4334
803+ exceptionReporter .reportException (e );
798804 return ;
799805 }
800806
801807 penalizeHost (host , failures );
802808 }
803809
804- private boolean isAbortLimitExceeedFor (InputAttemptIdentifier srcAttempt ) {
810+ private void isAbortLimitExceeedFor (InputAttemptIdentifier srcAttempt ) throws IOException {
805811 int attemptFailures = getFailureCount (srcAttempt );
806812 if (attemptFailures >= abortFailureLimit ) {
807813 // This task has seen too many fetch failures - report it as failed. The
@@ -816,15 +822,11 @@ private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
816822 inputContext .getSourceVertexName (),
817823 srcAttempt .getInputIdentifier (),
818824 srcAttempt .getAttemptNumber ()) + ". threshold=" + abortFailureLimit ;
819- IOException ioe = new IOException (errorMsg );
820- // Shuffle knows how to deal with failures post shutdown via the onFailure hook
821- exceptionReporter .reportException (ioe );
822- return true ;
825+ throw new IOException (errorMsg );
823826 }
824- return false ;
825827 }
826828
827- private void penalizeHost (MapHost host , int failures ) {
829+ private synchronized void penalizeHost (MapHost host , int failures ) {
828830 host .penalize ();
829831
830832 HostPort hostPort = new HostPort (host .getHost (), host .getPort ());
@@ -1008,14 +1010,15 @@ private boolean isFetcherHealthy(String logContext) {
10081010 return fetcherHealthy ;
10091011 }
10101012
1011- boolean isShuffleHealthy (InputAttemptFetchFailure fetchFailure ) {
1013+ /**
1014+ * This method checks if the current shuffle is healthy and throw IOException if it's not,
1015+ * then the caller is supposed to handle the IOException.
1016+ */
1017+ private synchronized void checkShuffleHealthy (InputAttemptFetchFailure fetchFailure )
1018+ throws IOException {
10121019 InputAttemptIdentifier srcAttempt = fetchFailure .getInputAttemptIdentifier ();
1013- if (isAbortLimitExceeedFor (srcAttempt )) {
1014- return false ;
1015- }
1016-
1017- final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction ;
1018- final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction ;
1020+ // supposed to throw IOException if exceeded
1021+ isAbortLimitExceeedFor (srcAttempt );
10191022
10201023 int doneMaps = numInputs - remainingMaps .get ();
10211024
@@ -1025,7 +1028,7 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
10251028 // check if the reducer has progressed enough
10261029 boolean reducerProgressedEnough =
10271030 (((float )doneMaps / numInputs )
1028- >= MIN_REQUIRED_PROGRESS_PERCENT );
1031+ >= minReqProgressFraction );
10291032
10301033 // check if the reducer is stalled for a long time
10311034 // duration for which the reducer is stalled
@@ -1038,7 +1041,7 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
10381041
10391042 boolean reducerStalled = (shuffleProgressDuration > 0 ) &&
10401043 (((float )stallDuration / shuffleProgressDuration )
1041- >= MAX_ALLOWED_STALL_TIME_PERCENT );
1044+ >= maxStallTimeFraction );
10421045
10431046 // kill if not healthy and has insufficient progress
10441047 if ((failureCounts .size () >= maxFailedUniqueFetches ||
@@ -1059,10 +1062,8 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
10591062 LOG .debug ("Host failures=" + hostFailures .keySet ());
10601063 }
10611064 // Shuffle knows how to deal with failures post shutdown via the onFailure hook
1062- exceptionReporter .reportException (new IOException (errorMsg , fetchFailure .getCause ()));
1063- return false ;
1065+ throw new IOException (errorMsg , fetchFailure .getCause ());
10641066 }
1065- return true ;
10661067 }
10671068
10681069 public synchronized void addKnownMapOutput (String inputHostName ,
0 commit comments