1818package org .apache .hadoop .hbase .client ;
1919
2020import static org .apache .hadoop .hbase .CellUtil .createCellScanner ;
21- import static org .apache .hadoop .hbase .client .ConnectionUtils .SLEEP_DELTA_NS ;
2221import static org .apache .hadoop .hbase .client .ConnectionUtils .calcPriority ;
23- import static org .apache .hadoop .hbase .client .ConnectionUtils .getPauseTime ;
2422import static org .apache .hadoop .hbase .client .ConnectionUtils .resetController ;
2523import static org .apache .hadoop .hbase .client .ConnectionUtils .translateException ;
2624import static org .apache .hadoop .hbase .util .ConcurrentMapUtils .computeIfAbsent ;
3533import java .util .List ;
3634import java .util .Map ;
3735import java .util .Optional ;
36+ import java .util .OptionalLong ;
3837import java .util .concurrent .CompletableFuture ;
3938import java .util .concurrent .ConcurrentHashMap ;
4039import java .util .concurrent .ConcurrentLinkedQueue ;
5655import org .apache .hadoop .hbase .client .MultiResponse .RegionResult ;
5756import org .apache .hadoop .hbase .client .RetriesExhaustedException .ThrowableWithExtraContext ;
5857import org .apache .hadoop .hbase .client .backoff .ClientBackoffPolicy ;
58+ import org .apache .hadoop .hbase .client .backoff .HBaseServerExceptionPauseManager ;
5959import org .apache .hadoop .hbase .client .backoff .ServerStatistics ;
6060import org .apache .hadoop .hbase .ipc .HBaseRpcController ;
6161import org .apache .hadoop .hbase .util .Bytes ;
@@ -102,10 +102,6 @@ class AsyncBatchRpcRetryingCaller<T> {
102102
103103 private final IdentityHashMap <Action , List <ThrowableWithExtraContext >> action2Errors ;
104104
105- private final long pauseNs ;
106-
107- private final long pauseNsForServerOverloaded ;
108-
109105 private final int maxAttempts ;
110106
111107 private final long operationTimeoutNs ;
@@ -116,6 +112,8 @@ class AsyncBatchRpcRetryingCaller<T> {
116112
117113 private final long startNs ;
118114
115+ private final HBaseServerExceptionPauseManager pauseManager ;
116+
119117 // we can not use HRegionLocation as the map key because the hashCode and equals method of
120118 // HRegionLocation only consider serverName.
121119 private static final class RegionRequest {
@@ -155,8 +153,6 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
155153 this .retryTimer = retryTimer ;
156154 this .conn = conn ;
157155 this .tableName = tableName ;
158- this .pauseNs = pauseNs ;
159- this .pauseNsForServerOverloaded = pauseNsForServerOverloaded ;
160156 this .maxAttempts = maxAttempts ;
161157 this .operationTimeoutNs = operationTimeoutNs ;
162158 this .rpcTimeoutNs = rpcTimeoutNs ;
@@ -182,6 +178,8 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
182178 }
183179 this .action2Errors = new IdentityHashMap <>();
184180 this .startNs = System .nanoTime ();
181+ this .pauseManager =
182+ new HBaseServerExceptionPauseManager (pauseNs , pauseNsForServerOverloaded , operationTimeoutNs );
185183 }
186184
187185 private static boolean hasIncrementOrAppend (Row action ) {
@@ -204,10 +202,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) {
204202 return false ;
205203 }
206204
207- private long remainingTimeNs () {
208- return operationTimeoutNs - (System .nanoTime () - startNs );
209- }
210-
211205 private List <ThrowableWithExtraContext > removeErrors (Action action ) {
212206 synchronized (action2Errors ) {
213207 return action2Errors .remove (action );
@@ -360,14 +354,14 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
360354 }
361355 });
362356 if (!failedActions .isEmpty ()) {
363- tryResubmit (failedActions .stream (), tries , retryImmediately .booleanValue (), false );
357+ tryResubmit (failedActions .stream (), tries , retryImmediately .booleanValue (), null );
364358 }
365359 }
366360
367361 private void sendToServer (ServerName serverName , ServerRequest serverReq , int tries ) {
368362 long remainingNs ;
369363 if (operationTimeoutNs > 0 ) {
370- remainingNs = remainingTimeNs ();
364+ remainingNs = pauseManager . remainingTimeNs (startNs );
371365 if (remainingNs <= 0 ) {
372366 failAll (serverReq .actionsByRegion .values ().stream ().flatMap (r -> r .actions .stream ()),
373367 tries );
@@ -465,29 +459,22 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
465459 List <Action > copiedActions = actionsByRegion .values ().stream ().flatMap (r -> r .actions .stream ())
466460 .collect (Collectors .toList ());
467461 addError (copiedActions , error , serverName );
468- tryResubmit (copiedActions .stream (), tries , error instanceof RetryImmediatelyException ,
469- HBaseServerException .isServerOverloaded (error ));
462+ tryResubmit (copiedActions .stream (), tries , error instanceof RetryImmediatelyException , error );
470463 }
471464
472465 private void tryResubmit (Stream <Action > actions , int tries , boolean immediately ,
473- boolean isServerOverloaded ) {
466+ Throwable error ) {
474467 if (immediately ) {
475468 groupAndSend (actions , tries );
476469 return ;
477470 }
478- long delayNs ;
479- long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs ;
480- if (operationTimeoutNs > 0 ) {
481- long maxDelayNs = remainingTimeNs () - SLEEP_DELTA_NS ;
482- if (maxDelayNs <= 0 ) {
483- failAll (actions , tries );
484- return ;
485- }
486- delayNs = Math .min (maxDelayNs , getPauseTime (pauseNsToUse , tries - 1 ));
487- } else {
488- delayNs = getPauseTime (pauseNsToUse , tries - 1 );
471+ OptionalLong maybePauseNsToUse = pauseManager .getPauseNsFromException (error , tries , startNs );
472+ if (!maybePauseNsToUse .isPresent ()) {
473+ failAll (actions , tries );
474+ return ;
489475 }
490- if (isServerOverloaded ) {
476+ long delayNs = maybePauseNsToUse .getAsLong ();
477+ if (HBaseServerException .isServerOverloaded (error )) {
491478 Optional <MetricsConnection > metrics = conn .getConnectionMetrics ();
492479 metrics .ifPresent (m -> m .incrementServerOverloadedBackoffTime (delayNs , TimeUnit .NANOSECONDS ));
493480 }
@@ -497,7 +484,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
497484 private void groupAndSend (Stream <Action > actions , int tries ) {
498485 long locateTimeoutNs ;
499486 if (operationTimeoutNs > 0 ) {
500- locateTimeoutNs = remainingTimeNs ();
487+ locateTimeoutNs = pauseManager . remainingTimeNs (startNs );
501488 if (locateTimeoutNs <= 0 ) {
502489 failAll (actions , tries );
503490 return ;
@@ -528,7 +515,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
528515 sendOrDelay (actionsByServer , tries );
529516 }
530517 if (!locateFailed .isEmpty ()) {
531- tryResubmit (locateFailed .stream (), tries , false , false );
518+ tryResubmit (locateFailed .stream (), tries , false , null );
532519 }
533520 });
534521 }
0 commit comments