Skip to content

Commit 13728dc

Browse files
author
Briana Augenreich
committed
Report client backoff time for server overloaded
1 parent 7e140e5 commit 13728dc

8 files changed

Lines changed: 68 additions & 14 deletions

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,10 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
487487
} else {
488488
delayNs = getPauseTime(pauseNsToUse, tries - 1);
489489
}
490+
if (isServerOverloaded) {
491+
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
492+
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
493+
}
490494
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
491495
}
492496

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,12 @@ private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttemp
747747
backOffTime = errorsByServer.calculateBackoffTime(oldServer,
748748
asyncProcess.connectionConfiguration.getPauseMillis());
749749
}
750+
751+
MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics();
752+
if (metrics != null && HBaseServerException.isServerOverloaded(throwable)) {
753+
metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS);
754+
}
755+
750756
if (numAttempt > asyncProcess.startLogErrorsCnt) {
751757
// We use this value to have some logs when we have multiple failures, but not too many
752758
// logs, as errors are to be expected when a region moves, splits and so on

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ private void tryScheduleRetry(Throwable error) {
139139
delayNs = getPauseTime(pauseNsToUse, tries - 1);
140140
}
141141
tries++;
142+
143+
if (HBaseServerException.isServerOverloaded(error)) {
144+
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
145+
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
146+
}
142147
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
143148
}
144149

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
113113

114114
private final Runnable completeWhenNoMoreResultsInRegion;
115115

116+
protected final AsyncConnectionImpl conn;
117+
116118
private final CompletableFuture<Boolean> future;
117119

118120
private final HBaseRpcController controller;
@@ -318,6 +320,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
318320
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
319321
int startLogErrorsCnt) {
320322
this.retryTimer = retryTimer;
323+
this.conn = conn;
321324
this.scan = scan;
322325
this.scanMetrics = scanMetrics;
323326
this.scannerId = scannerId;
@@ -441,6 +444,10 @@ private void onError(Throwable error) {
441444
return;
442445
}
443446
tries++;
447+
if (HBaseServerException.isServerOverloaded(error)) {
448+
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
449+
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
450+
}
444451
retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
445452
}
446453

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
299299

300300
this.stats = ServerStatisticTracker.create(conf);
301301
this.interceptor = new RetryingCallerInterceptorFactory(conf).build();
302-
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
303-
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
302+
304303
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
305-
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
306304

307305
boolean shouldListen =
308306
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
@@ -330,6 +328,10 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
330328
this.metaCache = new MetaCache(this.metrics);
331329

332330
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
331+
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
332+
this.rpcCallerFactory =
333+
RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics);
334+
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
333335

334336
// Do we publish the status?
335337
if (shouldListen) {
@@ -1056,6 +1058,11 @@ rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeou
10561058
// Only relocate the parent region if necessary
10571059
relocateMeta =
10581060
!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
1061+
1062+
if (metrics != null && HBaseServerException.isServerOverloaded(e)) {
1063+
metrics.incrementServerOverloadedBackoffTime(
1064+
ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS);
1065+
}
10591066
} finally {
10601067
userRegionLock.unlock();
10611068
}
@@ -2183,8 +2190,8 @@ public TableState getTableState(TableName tableName) throws IOException {
21832190

21842191
@Override
21852192
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2186-
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor,
2187-
this.getStatisticsTracker());
2193+
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(),
2194+
metrics);
21882195
}
21892196

21902197
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
315315
protected final Histogram numActionsPerServerHist;
316316
protected final Counter nsLookups;
317317
protected final Counter nsLookupsFailed;
318+
protected final Timer overloadedBackoffTimer;
318319

319320
// dynamic metrics
320321

@@ -377,6 +378,9 @@ protected Ratio getRatio() {
377378
this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
378379
this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
379380

381+
this.overloadedBackoffTimer =
382+
registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope));
383+
380384
this.reporter = JmxReporter.forRegistry(this.registry).build();
381385
this.reporter.start();
382386
}
@@ -449,6 +453,10 @@ public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
449453
this.runnerStats.updateDelayInterval(interval);
450454
}
451455

456+
public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
457+
overloadedBackoffTimer.update(time, timeUnit);
458+
}
459+
452460
/**
453461
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
454462
*/

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,24 @@ public class RpcRetryingCallerFactory {
3333
private final ConnectionConfiguration connectionConf;
3434
private final RetryingCallerInterceptor interceptor;
3535
private final int startLogErrorsCnt;
36+
private final MetricsConnection metrics;
3637

3738
public RpcRetryingCallerFactory(Configuration conf) {
38-
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
39+
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
3940
}
4041

4142
public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
43+
this(conf, interceptor, null);
44+
}
45+
46+
public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor,
47+
MetricsConnection metrics) {
4248
this.conf = conf;
4349
this.connectionConf = new ConnectionConfiguration(conf);
4450
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
4551
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
4652
this.interceptor = interceptor;
53+
this.metrics = metrics;
4754
}
4855

4956
/**
@@ -54,7 +61,7 @@ public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
5461
// is cheap as it does not require parsing a complex structure.
5562
return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
5663
connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
57-
interceptor, startLogErrorsCnt, rpcTimeout);
64+
interceptor, startLogErrorsCnt, rpcTimeout, metrics);
5865
}
5966

6067
/**
@@ -65,26 +72,29 @@ public <T> RpcRetryingCaller<T> newCaller() {
6572
// is cheap as it does not require parsing a complex structure.
6673
return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
6774
connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
68-
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout());
75+
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
6976
}
7077

7178
public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
72-
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
79+
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null,
80+
null);
7381
}
7482

7583
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
7684
ServerStatisticTracker stats) {
77-
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
85+
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats,
86+
null);
7887
}
7988

8089
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
81-
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
90+
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats,
91+
MetricsConnection metrics) {
8292
String clazzName = RpcRetryingCallerFactory.class.getName();
8393
String rpcCallerFactoryClazz =
8494
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
8595
RpcRetryingCallerFactory factory;
8696
if (rpcCallerFactoryClazz.equals(clazzName)) {
87-
factory = new RpcRetryingCallerFactory(configuration, interceptor);
97+
factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics);
8898
} else {
8999
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
90100
new Class[] { Configuration.class }, new Object[] { configuration });

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.time.Instant;
2727
import java.util.ArrayList;
2828
import java.util.List;
29+
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.atomic.AtomicBoolean;
3031
import org.apache.hadoop.hbase.DoNotRetryIOException;
3132
import org.apache.hadoop.hbase.HBaseServerException;
@@ -63,15 +64,17 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
6364
private final RetryingCallerInterceptor interceptor;
6465
private final RetryingCallerInterceptorContext context;
6566
private final RetryingTimeTracker tracker;
67+
private final MetricsConnection metrics;
6668

6769
public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
6870
int startLogErrorsCnt) {
6971
this(pause, pauseForServerOverloaded, retries,
70-
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
72+
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0, null);
7173
}
7274

7375
public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
74-
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
76+
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout,
77+
MetricsConnection metricsConnection) {
7578
this.pause = pause;
7679
this.pauseForServerOverloaded = pauseForServerOverloaded;
7780
this.maxAttempts = retries2Attempts(retries);
@@ -80,6 +83,7 @@ public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retr
8083
this.startLogErrorsCnt = startLogErrorsCnt;
8184
this.tracker = new RetryingTimeTracker();
8285
this.rpcTimeout = rpcTimeout;
86+
this.metrics = metricsConnection;
8387
}
8488

8589
@Override
@@ -158,6 +162,9 @@ public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
158162
+ t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
159163
throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t);
160164
}
165+
if (metrics != null && HBaseServerException.isServerOverloaded(t)) {
166+
metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS);
167+
}
161168
} finally {
162169
interceptor.updateFailureInfo(context);
163170
}

0 commit comments

Comments
 (0)