Skip to content

Commit 6c864e6

Browse files
author
Ray Mattingly
committed
pr feedback: isolate pause derivation logic, add unit tests
1 parent af76491 commit 6c864e6

6 files changed

Lines changed: 200 additions & 60 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
5757
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
5858
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
59+
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
5960
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
6061
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
6162
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
@@ -476,18 +477,14 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
476477
return;
477478
}
478479
long delayNs;
479-
long pauseNsToUse;
480-
boolean isServerOverloaded = false;
481-
if (error instanceof RpcThrottlingException) {
482-
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
483-
pauseNsToUse = rpcThrottlingException.getWaitInterval() * 1000; // wait interval is in millis
484-
if (LOG.isDebugEnabled()) {
485-
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException",
486-
rpcThrottlingException.getWaitInterval(), rpcThrottlingException);
487-
}
488-
} else {
489-
isServerOverloaded = HBaseServerException.isServerOverloaded(error);
490-
pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
480+
boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error);
481+
long pauseNsToUse =
482+
HBaseServerExceptionPauseManager.getPauseNanos(error, pauseNsForServerOverloaded, pauseNs);
483+
if (
484+
error instanceof RpcThrottlingException && pauseNsToUse > remainingTimeNs() - SLEEP_DELTA_NS
485+
) {
486+
failAll(actions, tries);
487+
return;
491488
}
492489
if (operationTimeoutNs > 0) {
493490
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.hbase.TableName;
3636
import org.apache.hadoop.hbase.TableNotEnabledException;
3737
import org.apache.hadoop.hbase.TableNotFoundException;
38+
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
3839
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
3940
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
4041
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
@@ -126,17 +127,13 @@ protected final void resetCallTimeout() {
126127
}
127128

128129
private void tryScheduleRetry(Throwable error) {
129-
long pauseNsToUse;
130-
if (error instanceof RpcThrottlingException) {
131-
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
132-
pauseNsToUse = rpcThrottlingException.getWaitInterval() * 1000; // wait interval is in millis
133-
if (LOG.isDebugEnabled()) {
134-
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException",
135-
rpcThrottlingException.getWaitInterval(), rpcThrottlingException);
136-
}
137-
} else {
138-
pauseNsToUse =
139-
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
130+
long pauseNsToUse =
131+
HBaseServerExceptionPauseManager.getPauseNanos(error, pauseNsForServerOverloaded, pauseNs);
132+
if (
133+
error instanceof RpcThrottlingException && pauseNsToUse > remainingTimeNs() - SLEEP_DELTA_NS
134+
) {
135+
completeExceptionally();
136+
return;
140137
}
141138
long delayNs;
142139
if (operationTimeoutNs > 0) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.hadoop.hbase.NotServingRegionException;
4444
import org.apache.hadoop.hbase.UnknownScannerException;
4545
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
46+
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
4647
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
4748
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
4849
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -420,21 +421,18 @@ private void onError(Throwable error) {
420421
return;
421422
}
422423
long delayNs;
423-
long pauseNsToUse;
424-
if (error instanceof RpcThrottlingException) {
425-
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
426-
pauseNsToUse = rpcThrottlingException.getWaitInterval() * 1000; // wait interval is in millis
427-
if (LOG.isDebugEnabled()) {
428-
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException",
429-
rpcThrottlingException.getWaitInterval(), rpcThrottlingException);
430-
}
431-
} else {
432-
pauseNsToUse =
433-
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
424+
long pauseNsToUse =
425+
HBaseServerExceptionPauseManager.getPauseNanos(error, pauseNsForServerOverloaded, pauseNs);
426+
if (
427+
error instanceof RpcThrottlingException && pauseNsToUse > remainingTimeNs() - SLEEP_DELTA_NS
428+
) {
429+
completeExceptionally(!scannerClosed);
430+
return;
434431
}
435432
if (scanTimeoutNs > 0) {
436-
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
437-
if (maxDelayNs <= 0) {
433+
long remainingTimeNs = remainingTimeNs();
434+
long maxDelayNs = remainingTimeNs - SLEEP_DELTA_NS;
435+
if (maxDelayNs <= 0 || maxDelayNs <= pauseNsToUse) {
438436
completeExceptionally(!scannerClosed);
439437
return;
440438
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@
2828
import java.util.List;
2929
import java.util.concurrent.TimeUnit;
3030
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.function.Function;
3132
import org.apache.hadoop.hbase.DoNotRetryIOException;
3233
import org.apache.hadoop.hbase.HBaseServerException;
34+
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
3335
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
34-
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
3536
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3637
import org.apache.hadoop.hbase.util.ExceptionUtil;
3738
import org.apache.hadoop.ipc.RemoteException;
@@ -141,30 +142,23 @@ public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
141142
if (tries >= maxAttempts - 1) {
142143
throw new RetriesExhaustedException(tries, exceptions);
143144
}
144-
if (t instanceof RpcThrottlingException) {
145-
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) t;
146-
expectedSleep = rpcThrottlingException.getWaitInterval();
147-
if (LOG.isDebugEnabled()) {
148-
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleep,
149-
rpcThrottlingException);
150-
}
151-
} else {
152-
// If the server is dead, we need to wait a little before retrying, to give
153-
// a chance to the regions to be moved
154-
// get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
155-
// special when encountering an exception indicating the server is overloaded.
156-
// see #HBASE-17114 and HBASE-26807
157-
long pauseBase =
158-
HBaseServerException.isServerOverloaded(t) ? pauseForServerOverloaded : pause;
159-
expectedSleep = callable.sleep(pauseBase, tries);
160145

161-
// If, after the planned sleep, there won't be enough time left, we stop now.
162-
long duration = singleCallDuration(expectedSleep);
163-
if (duration > callTimeout) {
164-
String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + ": "
165-
+ t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
166-
throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t);
167-
}
146+
int finalTries = tries;
147+
// If the server is dead, we need to wait a little before retrying, to give
148+
// a chance to the regions to be moved
149+
// get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
150+
// special when encountering an exception indicating the server is overloaded.
151+
// see #HBASE-17114 and HBASE-26807
152+
Function<Long, Long> retryFunction = pauseBase -> callable.sleep(pauseBase, finalTries);
153+
expectedSleep = HBaseServerExceptionPauseManager.getPauseMillis(t, retryFunction,
154+
pauseForServerOverloaded, pause);
155+
156+
// If, after the planned sleep, there won't be enough time left, we stop now.
157+
long duration = singleCallDuration(expectedSleep);
158+
if (duration > callTimeout) {
159+
String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + ": "
160+
+ t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
161+
throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t);
168162
}
169163
if (metrics != null && HBaseServerException.isServerOverloaded(t)) {
170164
metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS);
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client.backoff;
19+
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.function.Function;
22+
import org.apache.hadoop.hbase.HBaseServerException;
23+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
@InterfaceAudience.Private
29+
public class HBaseServerExceptionPauseManager {
30+
31+
private static final Logger LOG = LoggerFactory.getLogger(HBaseServerExceptionPauseManager.class);
32+
33+
public static long getPauseNanos(Throwable t, long pauseNsForServerOverloaded, long pauseNs) {
34+
long pauseMsForServerOverloaded = TimeUnit.NANOSECONDS.toMillis(pauseNsForServerOverloaded);
35+
long basePauseMs = TimeUnit.NANOSECONDS.toMillis(pauseNs);
36+
long pauseMillis = getPauseMillis(t, null, pauseMsForServerOverloaded, basePauseMs);
37+
return TimeUnit.MILLISECONDS.toNanos(pauseMillis);
38+
}
39+
40+
public static long getPauseMillis(Throwable t, Function<Long, Long> retryFunction,
41+
long pauseForServerOverloaded, long pause) {
42+
long expectedSleep;
43+
if (t instanceof RpcThrottlingException) {
44+
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) t;
45+
expectedSleep = rpcThrottlingException.getWaitInterval();
46+
if (LOG.isDebugEnabled()) {
47+
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleep,
48+
rpcThrottlingException);
49+
}
50+
} else {
51+
long pauseBase =
52+
HBaseServerException.isServerOverloaded(t) ? pauseForServerOverloaded : pause;
53+
if (retryFunction == null) {
54+
expectedSleep = pauseBase;
55+
} else {
56+
expectedSleep = retryFunction.apply(pauseBase);
57+
}
58+
}
59+
return expectedSleep;
60+
}
61+
62+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client.backoff;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Function;
24+
import org.apache.hadoop.hbase.HBaseClassTestRule;
25+
import org.apache.hadoop.hbase.HBaseServerException;
26+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
27+
import org.apache.hadoop.hbase.testclassification.ClientTests;
28+
import org.apache.hadoop.hbase.testclassification.SmallTests;
29+
import org.junit.ClassRule;
30+
import org.junit.Test;
31+
import org.junit.experimental.categories.Category;
32+
33+
@Category({ ClientTests.class, SmallTests.class })
34+
public class TestHBaseServerExceptionPauseManager {
35+
36+
private static final long WAIT_INTERVAL_MILLIS = 1L;
37+
private static final long WAIT_INTERVAL_NANOS =
38+
TimeUnit.MILLISECONDS.toNanos(WAIT_INTERVAL_MILLIS);
39+
40+
private static final long PAUSE_MILLIS_FOR_SERVER_OVERLOADED = 2L;
41+
private static final long PAUSE_NANOS_FOR_SERVER_OVERLOADED =
42+
TimeUnit.MILLISECONDS.toNanos(PAUSE_MILLIS_FOR_SERVER_OVERLOADED);
43+
44+
private static final long PAUSE_MILLIS = 3L;
45+
private static final long PAUSE_NANOS = TimeUnit.MILLISECONDS.toNanos(PAUSE_MILLIS);
46+
47+
private final RpcThrottlingException RPC_THROTTLING_EXCEPTION = new RpcThrottlingException(
48+
RpcThrottlingException.Type.NumRequestsExceeded, WAIT_INTERVAL_MILLIS, "doot");
49+
private final Throwable OTHER_EXCEPTION = new RuntimeException("");
50+
private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true);
51+
52+
@ClassRule
53+
public static final HBaseClassTestRule CLASS_RULE =
54+
HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);
55+
56+
@Test
57+
public void itSupportsRpcThrottlingNanos() {
58+
long pauseNanos = HBaseServerExceptionPauseManager.getPauseNanos(RPC_THROTTLING_EXCEPTION,
59+
PAUSE_NANOS_FOR_SERVER_OVERLOADED, PAUSE_NANOS);
60+
assertEquals(pauseNanos, WAIT_INTERVAL_NANOS);
61+
}
62+
63+
@Test
64+
public void itSupportsRpcThrottlingMillis() {
65+
long pauseMillis = HBaseServerExceptionPauseManager.getPauseMillis(RPC_THROTTLING_EXCEPTION,
66+
r -> r, PAUSE_MILLIS_FOR_SERVER_OVERLOADED, PAUSE_MILLIS);
67+
assertEquals(pauseMillis, WAIT_INTERVAL_MILLIS);
68+
}
69+
70+
@Test
71+
public void itSupportsRetryFunction() {
72+
Function<Long, Long> retryFunction = r -> 2 * r;
73+
Long pauseMillis = HBaseServerExceptionPauseManager.getPauseMillis(OTHER_EXCEPTION,
74+
retryFunction, PAUSE_MILLIS_FOR_SERVER_OVERLOADED, PAUSE_MILLIS);
75+
assertEquals(pauseMillis, retryFunction.apply(PAUSE_MILLIS));
76+
}
77+
78+
@Test
79+
public void itSupportsServerOverloadedExceptionMillis() {
80+
long pauseMillis = HBaseServerExceptionPauseManager.getPauseMillis(SERVER_OVERLOADED_EXCEPTION,
81+
r -> r, PAUSE_MILLIS_FOR_SERVER_OVERLOADED, PAUSE_MILLIS);
82+
assertEquals(pauseMillis, PAUSE_MILLIS_FOR_SERVER_OVERLOADED);
83+
}
84+
85+
@Test
86+
public void itSupportsServerOverloadedExceptionNanos() {
87+
long pauseNanos = HBaseServerExceptionPauseManager.getPauseNanos(SERVER_OVERLOADED_EXCEPTION,
88+
PAUSE_NANOS_FOR_SERVER_OVERLOADED, PAUSE_NANOS);
89+
assertEquals(pauseNanos, PAUSE_NANOS_FOR_SERVER_OVERLOADED);
90+
}
91+
92+
}

0 commit comments

Comments
 (0)