@@ -448,30 +448,22 @@ void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
448448
449449 boolean isReplica = false ;
450450 List <Action > unknownReplicaActions = null ;
451+ List <Action > locateRegionFailedActions = null ;
451452 for (Action action : currentActions ) {
452453 if (isOperationTimeoutExceeded ()) {
453- String message = numAttempt == 1
454- ? "Operation timeout exceeded during resolution of region locations, "
455- + "prior to executing any actions."
456- : "Operation timeout exceeded during re-resolution of region locations on retry "
457- + (numAttempt - 1 ) + "." ;
458-
459- message += " Meta may be slow or operation timeout too short for batch size or retries." ;
460- OperationTimeoutExceededException exception =
461- new OperationTimeoutExceededException (message );
462-
463- // Clear any actions we already resolved, because none will have been executed yet
464- // We are going to fail all passed actions because there's no way we can execute any
465- // if operation timeout is exceeded.
466454 actionsByServer .clear ();
467- for (Action actionToFail : currentActions ) {
468- manageLocationError (actionToFail , exception );
469- }
455+ failIncompleteActionsWithOpTimeout (currentActions , locateRegionFailedActions , numAttempt );
470456 return ;
471457 }
472458
473459 RegionLocations locs = findAllLocationsOrFail (action , true );
474- if (locs == null ) continue ;
460+ if (locs == null ) {
461+ if (locateRegionFailedActions == null ) {
462+ locateRegionFailedActions = new ArrayList <>(1 );
463+ }
464+ locateRegionFailedActions .add (action );
465+ continue ;
466+ }
475467 boolean isReplicaAction = !RegionReplicaUtil .isDefaultReplica (action .getReplicaId ());
476468 if (isReplica && !isReplicaAction ) {
477469 // This is the property of the current implementation, not a requirement.
@@ -488,6 +480,10 @@ void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
488480 } else {
489481 // TODO: relies on primary location always being fetched
490482 manageLocationError (action , null );
483+ if (locateRegionFailedActions == null ) {
484+ locateRegionFailedActions = new ArrayList <>(1 );
485+ }
486+ locateRegionFailedActions .add (action );
491487 }
492488 } else {
493489 byte [] regionName = loc .getRegionInfo ().getRegionName ();
@@ -561,6 +557,39 @@ private RegionLocations findAllLocationsOrFail(Action action, boolean useCache)
561557 return loc ;
562558 }
563559
560+ /**
561+ * For failing all actions that were being grouped during a groupAndSendMultiAction when operation
562+ * timeout was exceeded and there is no time remaining to continue grouping/sending any of the
563+ * actions. We don't fail any actions which have already failed to completion during grouping due
564+ * to location error (they already have an error set and had action counter decremented for)
565+ * @param actions actions being processed by the groupAndSend when operation
566+ * timeout occurred
567+ * @param locateRegionFailedActions actions already failed to completion due to location error
568+ * @param numAttempt the number of attempts so far
569+ */
570+ private void failIncompleteActionsWithOpTimeout (List <Action > actions ,
571+ List <Action > locateRegionFailedActions , int numAttempt ) {
572+ String message = numAttempt == 1
573+ ? "Operation timeout exceeded during resolution of region locations, "
574+ + "prior to executing any actions."
575+ : "Operation timeout exceeded during re-resolution of region locations on retry "
576+ + (numAttempt - 1 ) + "." ;
577+ message += " Meta may be slow or operation timeout too short for batch size or retries." ;
578+ OperationTimeoutExceededException exception = new OperationTimeoutExceededException (message );
579+
580+ for (Action actionToFail : actions ) {
581+ // Action equality is implemented as row equality so we check action index equality
582+ // since we don't want two different actions for the same row to be considered equal here
583+ boolean actionAlreadyFailed =
584+ locateRegionFailedActions != null && locateRegionFailedActions .stream ().anyMatch (
585+ failedAction -> failedAction .getOriginalIndex () == actionToFail .getOriginalIndex ()
586+ && failedAction .getReplicaId () == actionToFail .getReplicaId ());
587+ if (!actionAlreadyFailed ) {
588+ manageLocationError (actionToFail , exception );
589+ }
590+ }
591+ }
592+
564593 /**
565594 * Send a multi action structure to the servers, after a delay depending on the attempt number.
566595 * Asynchronous.
0 commit comments