Skip to content

Commit 28f77ff

Browse files
author
Ray Mattingly
committed
simplify HBaseServerExceptionPauseManager interface
1 parent e2c9c1f commit 28f77ff

6 files changed

Lines changed: 33 additions & 37 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,6 @@ class AsyncBatchRpcRetryingCaller<T> {
104104

105105
private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
106106

107-
private final long pauseNs;
108-
109-
private final long pauseNsForServerOverloaded;
110-
111107
private final int maxAttempts;
112108

113109
private final long operationTimeoutNs;
@@ -118,6 +114,8 @@ class AsyncBatchRpcRetryingCaller<T> {
118114

119115
private final long startNs;
120116

117+
private final HBaseServerExceptionPauseManager pauseManager;
118+
121119
// we can not use HRegionLocation as the map key because the hashCode and equals method of
122120
// HRegionLocation only consider serverName.
123121
private static final class RegionRequest {
@@ -157,15 +155,14 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
157155
this.retryTimer = retryTimer;
158156
this.conn = conn;
159157
this.tableName = tableName;
160-
this.pauseNs = pauseNs;
161-
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
162158
this.maxAttempts = maxAttempts;
163159
this.operationTimeoutNs = operationTimeoutNs;
164160
this.rpcTimeoutNs = rpcTimeoutNs;
165161
this.startLogErrorsCnt = startLogErrorsCnt;
166162
this.actions = new ArrayList<>(actions.size());
167163
this.futures = new ArrayList<>(actions.size());
168164
this.action2Future = new IdentityHashMap<>(actions.size());
165+
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
169166
for (int i = 0, n = actions.size(); i < n; i++) {
170167
Row rawAction = actions.get(i);
171168
Action action;
@@ -478,9 +475,8 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
478475
}
479476
long delayNs;
480477
boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error);
481-
482-
OptionalLong maybePauseNsToUse = HBaseServerExceptionPauseManager.getPauseNsFromException(error,
483-
pauseNs, pauseNsForServerOverloaded, remainingTimeNs() - SLEEP_DELTA_NS);
478+
OptionalLong maybePauseNsToUse =
479+
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
484480
if (!maybePauseNsToUse.isPresent()) {
485481
failAll(actions, tries);
486482
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,6 @@ public abstract class AsyncRpcRetryingCaller<T> {
5858

5959
private final long startNs;
6060

61-
private final long pauseNs;
62-
63-
private final long pauseNsForServerOverloaded;
64-
6561
private int tries = 1;
6662

6763
private final int maxAttempts;
@@ -80,14 +76,14 @@ public abstract class AsyncRpcRetryingCaller<T> {
8076

8177
protected final HBaseRpcController controller;
8278

79+
private final HBaseServerExceptionPauseManager pauseManager;
80+
8381
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
8482
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
8583
long rpcTimeoutNs, int startLogErrorsCnt) {
8684
this.retryTimer = retryTimer;
8785
this.conn = conn;
8886
this.priority = priority;
89-
this.pauseNs = pauseNs;
90-
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
9187
this.maxAttempts = maxAttempts;
9288
this.operationTimeoutNs = operationTimeoutNs;
9389
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -97,6 +93,7 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
9793
this.controller.setPriority(priority);
9894
this.exceptions = new ArrayList<>();
9995
this.startNs = System.nanoTime();
96+
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
10097
}
10198

10299
private long elapsedMs() {
@@ -127,8 +124,8 @@ protected final void resetCallTimeout() {
127124
}
128125

129126
private void tryScheduleRetry(Throwable error) {
130-
OptionalLong maybePauseNsToUse = HBaseServerExceptionPauseManager.getPauseNsFromException(error,
131-
pauseNs, pauseNsForServerOverloaded, remainingTimeNs() - SLEEP_DELTA_NS);
127+
OptionalLong maybePauseNsToUse =
128+
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
132129
if (!maybePauseNsToUse.isPresent()) {
133130
completeExceptionally();
134131
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {
101101

102102
private final long scannerLeaseTimeoutPeriodNs;
103103

104-
private final long pauseNs;
105-
106-
private final long pauseNsForServerOverloaded;
107-
108104
private final int maxAttempts;
109105

110106
private final long scanTimeoutNs;
@@ -133,6 +129,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
133129

134130
private long nextCallSeq = -1L;
135131

132+
private final HBaseServerExceptionPauseManager pauseManager;
133+
136134
private enum ScanControllerState {
137135
INITIALIZED,
138136
SUSPENDED,
@@ -332,8 +330,6 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
332330
this.loc = loc;
333331
this.regionServerRemote = isRegionServerRemote;
334332
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
335-
this.pauseNs = pauseNs;
336-
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
337333
this.maxAttempts = maxAttempts;
338334
this.scanTimeoutNs = scanTimeoutNs;
339335
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -348,6 +344,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
348344
this.controller = conn.rpcControllerFactory.newController();
349345
this.controller.setPriority(priority);
350346
this.exceptions = new ArrayList<>();
347+
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
351348
}
352349

353350
private long elapsedMs() {
@@ -422,8 +419,8 @@ private void onError(Throwable error) {
422419
}
423420
long delayNs;
424421

425-
OptionalLong maybePauseNsToUse = HBaseServerExceptionPauseManager.getPauseNsFromException(error,
426-
pauseNs, pauseNsForServerOverloaded, remainingTimeNs() - SLEEP_DELTA_NS);
422+
OptionalLong maybePauseNsToUse =
423+
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
427424
if (!maybePauseNsToUse.isPresent()) {
428425
completeExceptionally(!scannerClosed);
429426
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@
2929
public class HBaseServerExceptionPauseManager {
3030
private static final Logger LOG = LoggerFactory.getLogger(HBaseServerExceptionPauseManager.class);
3131

32-
private HBaseServerExceptionPauseManager() {}
32+
private final long pauseNs;
33+
private final long pauseNsForServerOverloaded;
3334

34-
public static OptionalLong getPauseNsFromException(Throwable error, long pauseNs,
35-
long pauseNsForServerOverloaded, long remainingTimeNs) {
35+
public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded) {
36+
this.pauseNs = pauseNs;
37+
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
38+
}
39+
40+
public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) {
3641
long expectedSleepNs;
3742
if (error instanceof RpcThrottlingException) {
3843
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;

hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,38 +47,39 @@ public class TestHBaseServerExceptionPauseManager {
4747
private final Throwable OTHER_EXCEPTION = new RuntimeException("");
4848
private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true);
4949

50+
private final HBaseServerExceptionPauseManager pauseManager =
51+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED);
52+
5053
@ClassRule
5154
public static final HBaseClassTestRule CLASS_RULE =
5255
HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);
5356

5457
@Test
5558
public void itSupportsRpcThrottlingNanos() {
56-
OptionalLong pauseNanos = HBaseServerExceptionPauseManager.getPauseNsFromException(
57-
RPC_THROTTLING_EXCEPTION, PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, Long.MAX_VALUE);
59+
OptionalLong pauseNanos =
60+
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, Long.MAX_VALUE);
5861
assertTrue(pauseNanos.isPresent());
5962
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
6063
}
6164

6265
@Test
6366
public void itSupportsServerOverloadedExceptionNanos() {
64-
OptionalLong pauseNanos = HBaseServerExceptionPauseManager.getPauseNsFromException(
65-
SERVER_OVERLOADED_EXCEPTION, PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, Long.MAX_VALUE);
67+
OptionalLong pauseNanos =
68+
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, Long.MAX_VALUE);
6669
assertTrue(pauseNanos.isPresent());
6770
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS_FOR_SERVER_OVERLOADED);
6871
}
6972

7073
@Test
7174
public void itSupportsOtherExceptionNanos() {
72-
OptionalLong pauseNanos = HBaseServerExceptionPauseManager.getPauseNsFromException(
73-
OTHER_EXCEPTION, PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, Long.MAX_VALUE);
75+
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, Long.MAX_VALUE);
7476
assertTrue(pauseNanos.isPresent());
7577
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS);
7678
}
7779

7880
@Test
7981
public void itThrottledTimeoutFastFail() {
80-
OptionalLong pauseNanos = HBaseServerExceptionPauseManager.getPauseNsFromException(
81-
RPC_THROTTLING_EXCEPTION, PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0L);
82+
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 0L);
8283
assertFalse(pauseNanos.isPresent());
8384
}
8485

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
import org.apache.hadoop.hbase.TableExistsException;
4040
import org.apache.hadoop.hbase.TableName;
4141
import org.apache.hadoop.hbase.TableNotFoundException;
42-
import org.apache.hadoop.hbase.master.LoadBalancer;
4342
import org.apache.hadoop.hbase.master.HMaster;
43+
import org.apache.hadoop.hbase.master.LoadBalancer;
4444
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
4545
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
4646
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;

0 commit comments

Comments
 (0)