Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
if (isServerOverloaded) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,12 @@ private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttemp
backOffTime = errorsByServer.calculateBackoffTime(oldServer,
asyncProcess.connectionConfiguration.getPauseMillis());
}

MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics();
if (metrics != null && HBaseServerException.isServerOverloaded(throwable)) {
metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS);
}

if (numAttempt > asyncProcess.startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ private void tryScheduleRetry(Throwable error) {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
tries++;

if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {

private final Runnable completeWhenNoMoreResultsInRegion;

protected final AsyncConnectionImpl conn;

private final CompletableFuture<Boolean> future;

private final HBaseRpcController controller;
Expand Down Expand Up @@ -318,6 +320,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.scan = scan;
this.scanMetrics = scanMetrics;
this.scannerId = scannerId;
Expand Down Expand Up @@ -441,6 +444,10 @@ private void onError(Throwable error) {
return;
}
tries++;
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {

this.stats = ServerStatisticTracker.create(conf);
this.interceptor = new RetryingCallerInterceptorFactory(conf).build();
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);

this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);

boolean shouldListen =
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
Expand Down Expand Up @@ -330,6 +328,10 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
this.metaCache = new MetaCache(this.metrics);

this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory =
RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);

// Do we publish the status?
if (shouldListen) {
Expand Down Expand Up @@ -1056,6 +1058,11 @@ rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeou
// Only relocate the parent region if necessary
relocateMeta =
!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);

if (metrics != null && HBaseServerException.isServerOverloaded(e)) {
metrics.incrementServerOverloadedBackoffTime(
ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS);
}
} finally {
userRegionLock.unlock();
}
Expand Down Expand Up @@ -2183,8 +2190,8 @@ public TableState getTableState(TableName tableName) throws IOException {

@Override
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor,
this.getStatisticsTracker());
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(),
metrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
protected final Histogram numActionsPerServerHist;
protected final Counter nsLookups;
protected final Counter nsLookupsFailed;
protected final Timer overloadedBackoffTimer;

// dynamic metrics

Expand Down Expand Up @@ -377,6 +378,9 @@ protected Ratio getRatio() {
this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));

this.overloadedBackoffTimer =
registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope));

this.reporter = JmxReporter.forRegistry(this.registry).build();
this.reporter.start();
}
Expand Down Expand Up @@ -449,6 +453,10 @@ public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
this.runnerStats.updateDelayInterval(interval);
}

public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
overloadedBackoffTimer.update(time, timeUnit);
}

/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ public class RpcRetryingCallerFactory {
private final ConnectionConfiguration connectionConf;
private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt;
private final MetricsConnection metrics;

public RpcRetryingCallerFactory(Configuration conf) {
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
}

public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor,
MetricsConnection metrics) {
this.conf = conf;
this.connectionConf = new ConnectionConfiguration(conf);
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.interceptor = interceptor;
this.metrics = metrics;
}

/**
Expand All @@ -54,7 +57,7 @@ public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
// is cheap as it does not require parsing a complex structure.
return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
interceptor, startLogErrorsCnt, rpcTimeout);
interceptor, startLogErrorsCnt, rpcTimeout, metrics);
}

/**
Expand All @@ -65,26 +68,29 @@ public <T> RpcRetryingCaller<T> newCaller() {
// is cheap as it does not require parsing a complex structure.
return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout());
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sorry for the delay here but I've been meaning to look into the usages of these methods. We basically want to ensure that our metrics object is getting in there as often as possible.

So I opened up branch-2 in intellij and looked at the usages of these instantiate overloads:

  • instantiate(Configuration) -- all of the callers to this method have a ClusterConnection in the call context, which we can pull our metrics from (connection.getConnectionMetrics())
  • `instantiate(Configuration, ServerStatisticTracker) -- all but the SecureBulkLoadClient and LoadIncrementalHFiles have similar.

So I think we should try to add ConnectionMetrics to all of the methods here, and we can pass null in for the 2 cases above.

return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null,
null);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ServerStatisticTracker stats) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats);
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats,
null);
}

public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) {
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats,
MetricsConnection metrics) {
String clazzName = RpcRetryingCallerFactory.class.getName();
String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
RpcRetryingCallerFactory factory;
if (rpcCallerFactoryClazz.equals(clazzName)) {
factory = new RpcRetryingCallerFactory(configuration, interceptor);
factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics);
} else {
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
Expand Down Expand Up @@ -63,15 +64,11 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final RetryingCallerInterceptor interceptor;
private final RetryingCallerInterceptorContext context;
private final RetryingTimeTracker tracker;
private final MetricsConnection metrics;

public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
int startLogErrorsCnt) {
this(pause, pauseForServerOverloaded, retries,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
}

public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout,
MetricsConnection metricsConnection) {
this.pause = pause;
this.pauseForServerOverloaded = pauseForServerOverloaded;
this.maxAttempts = retries2Attempts(retries);
Expand All @@ -80,6 +77,7 @@ public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retr
this.startLogErrorsCnt = startLogErrorsCnt;
this.tracker = new RetryingTimeTracker();
this.rpcTimeout = rpcTimeout;
this.metrics = metricsConnection;
}

@Override
Expand Down Expand Up @@ -158,6 +156,9 @@ public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+ t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t);
}
if (metrics != null && HBaseServerException.isServerOverloaded(t)) {
metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS);
}
} finally {
interceptor.updateFailureInfo(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
}
});

return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) throws IOException, RuntimeException {
Expand Down Expand Up @@ -307,7 +308,7 @@ static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse> {
private final IOException e;

public CallerWithFailure(IOException e) {
super(100, 500, 100, 9);
super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null);
this.e = e;
}

Expand Down Expand Up @@ -412,7 +413,8 @@ public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
replicaCalls.incrementAndGet();
}

return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) throws IOException, RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
});
});
mr.addException(REGION_INFO.getRegionName(), IOE);
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ private void itUsesSpecialPauseForServerOverloaded(
long pauseMillis = 1;
long specialPauseMillis = 2;

RpcRetryingCallerImpl<Void> caller =
new RpcRetryingCallerImpl<>(pauseMillis, specialPauseMillis, 2, 0);
RpcRetryingCallerImpl<Void> caller = new RpcRetryingCallerImpl<>(pauseMillis,
specialPauseMillis, 2, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 0, 0, null);

RetryingCallable<Void> callable =
new ThrowingCallable(CallQueueTooBigException.class, specialPauseMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati
RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf));
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory
.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
Table t = Mockito.mock(Table.class);
Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t);
Expand Down