Skip to content

Commit 9f8bc93

Browse files
authored
HBASE-27798: Client side should back off based on wait interval in RpcThrottlingException (#5226)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
1 parent d43dfcc commit 9f8bc93

7 files changed

Lines changed: 507 additions & 36 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.List;
3636
import java.util.Map;
3737
import java.util.Optional;
38+
import java.util.OptionalLong;
3839
import java.util.concurrent.CompletableFuture;
3940
import java.util.concurrent.ConcurrentHashMap;
4041
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -56,6 +57,7 @@
5657
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
5758
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
5859
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
60+
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
5961
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
6062
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
6163
import org.apache.hadoop.hbase.util.Bytes;
@@ -102,10 +104,6 @@ class AsyncBatchRpcRetryingCaller<T> {
102104

103105
private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
104106

105-
private final long pauseNs;
106-
107-
private final long pauseNsForServerOverloaded;
108-
109107
private final int maxAttempts;
110108

111109
private final long operationTimeoutNs;
@@ -116,6 +114,8 @@ class AsyncBatchRpcRetryingCaller<T> {
116114

117115
private final long startNs;
118116

117+
private final HBaseServerExceptionPauseManager pauseManager;
118+
119119
// we can not use HRegionLocation as the map key because the hashCode and equals method of
120120
// HRegionLocation only consider serverName.
121121
private static final class RegionRequest {
@@ -155,15 +155,14 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
155155
this.retryTimer = retryTimer;
156156
this.conn = conn;
157157
this.tableName = tableName;
158-
this.pauseNs = pauseNs;
159-
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
160158
this.maxAttempts = maxAttempts;
161159
this.operationTimeoutNs = operationTimeoutNs;
162160
this.rpcTimeoutNs = rpcTimeoutNs;
163161
this.startLogErrorsCnt = startLogErrorsCnt;
164162
this.actions = new ArrayList<>(actions.size());
165163
this.futures = new ArrayList<>(actions.size());
166164
this.action2Future = new IdentityHashMap<>(actions.size());
165+
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
167166
for (int i = 0, n = actions.size(); i < n; i++) {
168167
Row rawAction = actions.get(i);
169168
Action action;
@@ -360,7 +359,7 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
360359
}
361360
});
362361
if (!failedActions.isEmpty()) {
363-
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
362+
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null);
364363
}
365364
}
366365

@@ -465,18 +464,25 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
465464
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
466465
.collect(Collectors.toList());
467466
addError(copiedActions, error, serverName);
468-
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
469-
HBaseServerException.isServerOverloaded(error));
467+
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error);
470468
}
471469

472470
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
473-
boolean isServerOverloaded) {
471+
Throwable error) {
474472
if (immediately) {
475473
groupAndSend(actions, tries);
476474
return;
477475
}
478476
long delayNs;
479-
long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
477+
boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error);
478+
OptionalLong maybePauseNsToUse =
479+
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
480+
if (!maybePauseNsToUse.isPresent()) {
481+
failAll(actions, tries);
482+
return;
483+
}
484+
long pauseNsToUse = maybePauseNsToUse.getAsLong();
485+
480486
if (operationTimeoutNs > 0) {
481487
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
482488
if (maxDelayNs <= 0) {
@@ -528,7 +534,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
528534
sendOrDelay(actionsByServer, tries);
529535
}
530536
if (!locateFailed.isEmpty()) {
531-
tryResubmit(locateFailed.stream(), tries, false, false);
537+
tryResubmit(locateFailed.stream(), tries, false, null);
532538
}
533539
});
534540
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.ArrayList;
2626
import java.util.List;
2727
import java.util.Optional;
28+
import java.util.OptionalLong;
2829
import java.util.concurrent.CompletableFuture;
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.function.Consumer;
@@ -35,6 +36,7 @@
3536
import org.apache.hadoop.hbase.TableName;
3637
import org.apache.hadoop.hbase.TableNotEnabledException;
3738
import org.apache.hadoop.hbase.TableNotFoundException;
39+
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
3840
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
3941
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
4042
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -56,10 +58,6 @@ public abstract class AsyncRpcRetryingCaller<T> {
5658

5759
private final long startNs;
5860

59-
private final long pauseNs;
60-
61-
private final long pauseNsForServerOverloaded;
62-
6361
private int tries = 1;
6462

6563
private final int maxAttempts;
@@ -78,14 +76,14 @@ public abstract class AsyncRpcRetryingCaller<T> {
7876

7977
protected final HBaseRpcController controller;
8078

79+
private final HBaseServerExceptionPauseManager pauseManager;
80+
8181
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
8282
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
8383
long rpcTimeoutNs, int startLogErrorsCnt) {
8484
this.retryTimer = retryTimer;
8585
this.conn = conn;
8686
this.priority = priority;
87-
this.pauseNs = pauseNs;
88-
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
8987
this.maxAttempts = maxAttempts;
9088
this.operationTimeoutNs = operationTimeoutNs;
9189
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -95,6 +93,7 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
9593
this.controller.setPriority(priority);
9694
this.exceptions = new ArrayList<>();
9795
this.startNs = System.nanoTime();
96+
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
9897
}
9998

10099
private long elapsedMs() {
@@ -125,8 +124,14 @@ protected final void resetCallTimeout() {
125124
}
126125

127126
private void tryScheduleRetry(Throwable error) {
128-
long pauseNsToUse =
129-
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
127+
OptionalLong maybePauseNsToUse =
128+
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
129+
if (!maybePauseNsToUse.isPresent()) {
130+
completeExceptionally();
131+
return;
132+
}
133+
long pauseNsToUse = maybePauseNsToUse.getAsLong();
134+
130135
long delayNs;
131136
if (operationTimeoutNs > 0) {
132137
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.ArrayList;
3535
import java.util.List;
3636
import java.util.Optional;
37+
import java.util.OptionalLong;
3738
import java.util.concurrent.CompletableFuture;
3839
import java.util.concurrent.TimeUnit;
3940
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -43,6 +44,7 @@
4344
import org.apache.hadoop.hbase.NotServingRegionException;
4445
import org.apache.hadoop.hbase.UnknownScannerException;
4546
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
47+
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
4648
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
4749
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
4850
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -99,10 +101,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {
99101

100102
private final long scannerLeaseTimeoutPeriodNs;
101103

102-
private final long pauseNs;
103-
104-
private final long pauseNsForServerOverloaded;
105-
106104
private final int maxAttempts;
107105

108106
private final long scanTimeoutNs;
@@ -131,6 +129,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
131129

132130
private long nextCallSeq = -1L;
133131

132+
private final HBaseServerExceptionPauseManager pauseManager;
133+
134134
private enum ScanControllerState {
135135
INITIALIZED,
136136
SUSPENDED,
@@ -330,8 +330,6 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
330330
this.loc = loc;
331331
this.regionServerRemote = isRegionServerRemote;
332332
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
333-
this.pauseNs = pauseNs;
334-
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
335333
this.maxAttempts = maxAttempts;
336334
this.scanTimeoutNs = scanTimeoutNs;
337335
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -346,6 +344,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
346344
this.controller = conn.rpcControllerFactory.newController();
347345
this.controller.setPriority(priority);
348346
this.exceptions = new ArrayList<>();
347+
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
349348
}
350349

351350
private long elapsedMs() {
@@ -419,8 +418,15 @@ private void onError(Throwable error) {
419418
return;
420419
}
421420
long delayNs;
422-
long pauseNsToUse =
423-
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
421+
422+
OptionalLong maybePauseNsToUse =
423+
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
424+
if (!maybePauseNsToUse.isPresent()) {
425+
completeExceptionally(!scannerClosed);
426+
return;
427+
}
428+
long pauseNsToUse = maybePauseNsToUse.getAsLong();
429+
424430
if (scanTimeoutNs > 0) {
425431
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
426432
if (maxDelayNs <= 0) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.hbase.DoNotRetryIOException;
3232
import org.apache.hadoop.hbase.HBaseServerException;
3333
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
34+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
3435
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3536
import org.apache.hadoop.hbase.util.ExceptionUtil;
3637
import org.apache.hadoop.ipc.RemoteException;
@@ -140,14 +141,28 @@ public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
140141
if (tries >= maxAttempts - 1) {
141142
throw new RetriesExhaustedException(tries, exceptions);
142143
}
143-
// If the server is dead, we need to wait a little before retrying, to give
144-
// a chance to the regions to be moved
145-
// get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
146-
// special when encountering an exception indicating the server is overloaded.
147-
// see #HBASE-17114 and HBASE-26807
148-
long pauseBase =
149-
HBaseServerException.isServerOverloaded(t) ? pauseForServerOverloaded : pause;
150-
expectedSleep = callable.sleep(pauseBase, tries);
144+
145+
if (t instanceof RpcThrottlingException) {
146+
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) t;
147+
expectedSleep = rpcThrottlingException.getWaitInterval();
148+
if (LOG.isDebugEnabled()) {
149+
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleep,
150+
rpcThrottlingException);
151+
}
152+
} else {
153+
expectedSleep =
154+
HBaseServerException.isServerOverloaded(t) ? pauseForServerOverloaded : pause;
155+
156+
// only factor in retry adjustment for non-RpcThrottlingExceptions
157+
// because RpcThrottlingExceptions tell you how long to wait
158+
159+
// If the server is dead, we need to wait a little before retrying, to give
160+
// a chance to the regions to be moved
161+
// get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
162+
// special when encountering an exception indicating the server is overloaded.
163+
// see #HBASE-17114 and HBASE-26807
164+
expectedSleep = callable.sleep(expectedSleep, tries);
165+
}
151166

152167
// If, after the planned sleep, there won't be enough time left, we stop now.
153168
long duration = singleCallDuration(expectedSleep);
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client.backoff;
19+
20+
import java.util.OptionalLong;
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.hadoop.hbase.HBaseServerException;
23+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
@InterfaceAudience.Private
29+
public class HBaseServerExceptionPauseManager {
30+
private static final Logger LOG = LoggerFactory.getLogger(HBaseServerExceptionPauseManager.class);
31+
32+
private final long pauseNs;
33+
private final long pauseNsForServerOverloaded;
34+
35+
public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded) {
36+
this.pauseNs = pauseNs;
37+
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
38+
}
39+
40+
public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) {
41+
long expectedSleepNs;
42+
if (error instanceof RpcThrottlingException) {
43+
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
44+
expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
45+
if (expectedSleepNs > remainingTimeNs) {
46+
return OptionalLong.empty();
47+
}
48+
if (LOG.isDebugEnabled()) {
49+
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleepNs,
50+
rpcThrottlingException);
51+
}
52+
} else {
53+
expectedSleepNs =
54+
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
55+
}
56+
return OptionalLong.of(expectedSleepNs);
57+
}
58+
59+
}

0 commit comments

Comments
 (0)