From 140878e8733aef45a8281b4bb2bacb36fd13bc21 Mon Sep 17 00:00:00 2001 From: Briana Augenreich Date: Tue, 16 Aug 2022 16:08:24 -0400 Subject: [PATCH 1/6] HBASE-26809 Report client backoff time for server overloaded in ConnectionMetrics --- .../client/AsyncBatchRpcRetryingCaller.java | 4 ++++ .../hbase/client/AsyncRequestFutureImpl.java | 6 +++++ .../hbase/client/AsyncRpcRetryingCaller.java | 5 ++++ ...syncScanSingleRegionRpcRetryingCaller.java | 7 ++++++ .../client/ConnectionImplementation.java | 17 +++++++++---- .../hbase/client/MetricsConnection.java | 8 +++++++ .../client/RpcRetryingCallerFactory.java | 24 +++++++++++++------ .../hbase/client/RpcRetryingCallerImpl.java | 11 +++++++-- 8 files changed, 68 insertions(+), 14 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 0798915c08de..7bee885586bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -487,6 +487,10 @@ private void tryResubmit(Stream actions, int tries, boolean immediately, } else { delayNs = getPauseTime(pauseNsToUse, tries - 1); } + if (isServerOverloaded) { + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 987d86bd8aae..a91fd5af6af2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -747,6 +747,12 @@ private void resubmit(ServerName oldServer, List toReplay, int numAttemp backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.connectionConfiguration.getPauseMillis()); } + + MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics(); + if (metrics != null && HBaseServerException.isServerOverloaded(throwable)) { + metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS); + } + if (numAttempt > asyncProcess.startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 95ed97e18115..d0bbe4b5fa3b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -139,6 +139,11 @@ private void tryScheduleRetry(Throwable error) { delayNs = getPauseTime(pauseNsToUse, tries - 1); } tries++; + + if (HBaseServerException.isServerOverloaded(error)) { + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index dbaae5c26e2e..2653b3c75b3e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -113,6 +113,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final Runnable completeWhenNoMoreResultsInRegion; + protected final AsyncConnectionImpl conn; + private final CompletableFuture future; private final HBaseRpcController controller; @@ -318,6 +320,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; + this.conn = conn; this.scan = scan; this.scanMetrics = scanMetrics; this.scannerId = scannerId; @@ -441,6 +444,10 @@ private void onError(Throwable error) { return; } tries++; + if (HBaseServerException.isServerOverloaded(error)) { + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 4c828036fd03..7460d5bd7616 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -299,10 +299,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { this.stats = ServerStatisticTracker.create(conf); this.interceptor = new RetryingCallerInterceptorFactory(conf).build(); - this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); - this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); @@ -330,6 +328,10 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { this.metaCache = new MetaCache(this.metrics); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.rpcCallerFactory = + RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics); + this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); // Do we publish the status? if (shouldListen) { @@ -1056,6 +1058,11 @@ rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeou // Only relocate the parent region if necessary relocateMeta = !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); + + if (metrics != null && HBaseServerException.isServerOverloaded(e)) { + metrics.incrementServerOverloadedBackoffTime( + ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS); + } } finally { userRegionLock.unlock(); } @@ -2183,8 +2190,8 @@ public TableState getTableState(TableName tableName) throws IOException { @Override public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { - return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, - this.getStatisticsTracker()); + return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(), + metrics); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index dc452bcd9d9a..2ad2944cdddd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -315,6 +315,7 @@ public Counter newMetric(Class clazz, String name, String scope) { protected final Histogram numActionsPerServerHist; protected final Counter nsLookups; protected final Counter nsLookupsFailed; + protected final Timer overloadedBackoffTimer; // dynamic metrics @@ -377,6 +378,9 @@ protected Ratio getRatio() { this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); + this.overloadedBackoffTimer = + registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope)); + this.reporter = JmxReporter.forRegistry(this.registry).build(); this.reporter.start(); } @@ -449,6 +453,10 @@ public void incrDelayRunnersAndUpdateDelayInterval(long interval) { this.runnerStats.updateDelayInterval(interval); } + public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) { + overloadedBackoffTimer.update(time, timeUnit); + } + /** * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index a19913782285..ae17699af81d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -33,17 +33,24 @@ public class RpcRetryingCallerFactory { private final ConnectionConfiguration connectionConf; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; + private final MetricsConnection metrics; public RpcRetryingCallerFactory(Configuration conf) { - this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); } public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { + this(conf, interceptor, null); + } + + public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor, + MetricsConnection metrics) { this.conf = conf; this.connectionConf = new ConnectionConfiguration(conf); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; + this.metrics = metrics; } /** @@ -54,7 +61,7 @@ public RpcRetryingCaller newCaller(int rpcTimeout) { // is cheap as it does not require parsing a complex structure. return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(), connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), - interceptor, startLogErrorsCnt, rpcTimeout); + interceptor, startLogErrorsCnt, rpcTimeout, metrics); } /** @@ -65,26 +72,29 @@ public RpcRetryingCaller newCaller() { // is cheap as it does not require parsing a complex structure. return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(), connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), - interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout()); + interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics); } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, + null); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, ServerStatisticTracker stats) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, + null); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, - RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) { + RetryingCallerInterceptor interceptor, ServerStatisticTracker stats, + MetricsConnection metrics) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); RpcRetryingCallerFactory factory; if (rpcCallerFactoryClazz.equals(clazzName)) { - factory = new RpcRetryingCallerFactory(configuration, interceptor); + factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics); } else { factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 67b12b8a34b1..6c881033af27 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -26,6 +26,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseServerException; @@ -63,15 +64,17 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptorContext context; private final RetryingTimeTracker tracker; + private final MetricsConnection metrics; public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, int startLogErrorsCnt) { this(pause, pauseForServerOverloaded, retries, - RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0, null); } public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, - RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { + RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout, + MetricsConnection metricsConnection) { this.pause = pause; this.pauseForServerOverloaded = pauseForServerOverloaded; this.maxAttempts = retries2Attempts(retries); @@ -80,6 +83,7 @@ public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retr this.startLogErrorsCnt = startLogErrorsCnt; this.tracker = new RetryingTimeTracker(); this.rpcTimeout = rpcTimeout; + this.metrics = metricsConnection; } @Override @@ -158,6 +162,9 @@ public T callWithRetries(RetryingCallable callable, int callTimeout) + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t); } + if (metrics != null && HBaseServerException.isServerOverloaded(t)) { + metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS); + } } finally { interceptor.updateFailureInfo(context); } From 30aa833f51cdb35b179fa045776efe3b2f2cca4f Mon Sep 17 00:00:00 2001 From: Briana Augenreich Date: Thu, 15 Sep 2022 09:50:25 -0400 Subject: [PATCH 2/6] Fix test in hbase-server --- .../apache/hadoop/hbase/client/HConnectionTestingUtility.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index bdbef295f318..1aefb5e13206 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -126,7 +126,7 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf)); Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory - .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); + .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, asyncProcess.connection.getConnectionMetrics())); Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class)); Table t = Mockito.mock(Table.class); Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t); From 45ad7457e6e5ba7631283600c0a31f46929d5b80 Mon Sep 17 00:00:00 2001 From: Briana Augenreich Date: Thu, 15 Sep 2022 10:46:54 -0400 Subject: [PATCH 3/6] Remmediate failing test --- .../apache/hadoop/hbase/client/HConnectionTestingUtility.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 1aefb5e13206..0a49d94068ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -126,7 +126,7 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf)); Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory - .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, asyncProcess.connection.getConnectionMetrics())); + .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null)); Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class)); Table t = Mockito.mock(Table.class); Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t); From 5b1835e224e6694c05e7ad98b1e98ea4e15664e9 Mon Sep 17 00:00:00 2001 From: Briana Augenreich Date: Thu, 29 Sep 2022 20:25:37 -0400 Subject: [PATCH 4/6] remove RpcRetryingCallerImpl overloaded constructor --- .../hadoop/hbase/client/RpcRetryingCallerFactory.java | 4 ---- .../apache/hadoop/hbase/client/RpcRetryingCallerImpl.java | 6 ------ .../org/apache/hadoop/hbase/client/TestAsyncProcess.java | 8 +++++--- .../hbase/client/TestAsyncProcessWithRegionException.java | 3 ++- .../hadoop/hbase/client/TestRpcRetryingCallerImpl.java | 4 ++-- 5 files changed, 9 insertions(+), 16 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index ae17699af81d..f5b743ac24a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -39,10 +39,6 @@ public RpcRetryingCallerFactory(Configuration conf) { this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); } - public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { - this(conf, interceptor, null); - } - public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor, MetricsConnection metrics) { this.conf = conf; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 6c881033af27..4d88e34ff656 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -66,12 +66,6 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final RetryingTimeTracker tracker; private final MetricsConnection metrics; - public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, - int startLogErrorsCnt) { - this(pause, pauseForServerOverloaded, retries, - RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0, null); - } - public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout, MetricsConnection metricsConnection) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index de6d6012322e..a55001e16279 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -246,7 +246,8 @@ public void addResponse(MultiResponse mr, byte[] regionName, Action a) { } }); - return new RpcRetryingCallerImpl(100, 500, 10, 9) { + return new RpcRetryingCallerImpl(100, 500, 10, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { @Override public AbstractResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { @@ -307,7 +308,7 @@ static class CallerWithFailure extends RpcRetryingCallerImpl { private final IOException e; public CallerWithFailure(IOException e) { - super(100, 500, 100, 9); + super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null); this.e = e; } @@ -412,7 +413,8 @@ public void addResponse(MultiResponse mr, byte[] regionName, Action a) { replicaCalls.incrementAndGet(); } - return new RpcRetryingCallerImpl(100, 500, 10, 9) { + return new RpcRetryingCallerImpl(100, 500, 10, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java index ca9aabfef042..98c137612620 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java @@ -222,7 +222,8 @@ public AsyncRequestFuture submit(TableName tableName, List rows) }); }); mr.addException(REGION_INFO.getRegionName(), IOE); - return new RpcRetryingCallerImpl(100, 500, 0, 9) { + return new RpcRetryingCallerImpl(100, 500, 0, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { @Override public AbstractResponse callWithoutRetries(RetryingCallable callable, int callTimeout) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java index 3d3d64f4c212..a01771900351 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java @@ -58,8 +58,8 @@ private void itUsesSpecialPauseForServerOverloaded( long pauseMillis = 1; long specialPauseMillis = 2; - RpcRetryingCallerImpl caller = - new RpcRetryingCallerImpl<>(pauseMillis, specialPauseMillis, 2, 0); + RpcRetryingCallerImpl caller = new RpcRetryingCallerImpl<>(pauseMillis, + specialPauseMillis, 2, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 0, 0, null); RetryingCallable callable = new ThrowingCallable(CallQueueTooBigException.class, specialPauseMillis); From d362c482b32138528d5e03182e18619745b444a8 Mon Sep 17 00:00:00 2001 From: Briana Augenreich Date: Fri, 14 Oct 2022 16:05:35 -0400 Subject: [PATCH 5/6] add metricsConnection to rpcretryingcallerfactory --- .../java/org/apache/hadoop/hbase/client/HTable.java | 5 +++-- .../apache/hadoop/hbase/client/HTableMultiplexer.java | 3 ++- .../hadoop/hbase/client/RpcRetryingCallerFactory.java | 9 +++++---- .../hbase/client/ScannerCallableWithReplicas.java | 11 ++++++----- .../hadoop/hbase/client/SecureBulkLoadClient.java | 4 ++-- .../hadoop/hbase/regionserver/HRegionServer.java | 3 ++- .../RegionReplicaReplicationEndpoint.java | 4 ++-- .../hadoop/hbase/tool/LoadIncrementalHFiles.java | 2 +- .../hbase/client/HConnectionTestingUtility.java | 3 ++- .../TestRegionReplicaReplicationEndpointNoMaster.java | 4 ++-- 10 files changed, 27 insertions(+), 21 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index f36c4af0de07..80325abd7f95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1305,8 +1305,9 @@ public void batchCoprocessorService( final List callbackErrorServers = new ArrayList<>(); Object[] results = new Object[execs.size()]; - AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, - RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), + AsyncProcess asyncProcess = new AsyncProcess( + connection, configuration, RpcRetryingCallerFactory.instantiate(configuration, + connection.getStatisticsTracker(), connection.getConnectionMetrics()), RpcControllerFactory.instantiate(configuration)); Batch.Callback resultsCallback = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index ddfe7ca439ae..a9835cb0b426 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -422,7 +422,8 @@ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation a this.addr = addr; this.multiplexer = htableMultiplexer; this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); - RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + RpcRetryingCallerFactory rpcCallerFactory = + RpcRetryingCallerFactory.instantiate(conf, conn.getConnectionMetrics()); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index f5b743ac24a8..3e8545f6a388 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -71,15 +71,16 @@ public RpcRetryingCaller newCaller() { interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics); } - public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + MetricsConnection metrics) { return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, - null); + metrics); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, - ServerStatisticTracker stats) { + ServerStatisticTracker stats, MetricsConnection metrics) { return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, - null); + metrics); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index fe155136cb22..46977447ed64 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -178,9 +178,9 @@ public Result[] call(int timeout) throws IOException { // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService> cs = - new ResultBoundedCompletionService<>( - RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, - regionReplication * 5); + new ResultBoundedCompletionService<>(RpcRetryingCallerFactory + .instantiate(ScannerCallableWithReplicas.this.conf, cConnection.getConnectionMetrics()), + pool, regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); replicaSwitched.set(false); @@ -381,8 +381,9 @@ class RetryingRPC implements RetryingCallable>, // and we can't invoke it multiple times at the same time) this.caller = ScannerCallableWithReplicas.this.caller; if (scan.getConsistency() == Consistency.TIMELINE) { - this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf).< - Result[]> newCaller(); + this.caller = RpcRetryingCallerFactory + .instantiate(ScannerCallableWithReplicas.this.conf, cConnection.getConnectionMetrics()) + . newCaller(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index aeca91e5bc9d..5ffbb5d9e453 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -69,7 +69,7 @@ protected String rpcCall() throws Exception { return response.getBulkToken(); } }; - return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) + return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null) . newCaller().callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { throw new IOException(throwable); @@ -91,7 +91,7 @@ protected Void rpcCall() throws Exception { return null; } }; - RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null). newCaller() + RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null). newCaller() .callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { throw new IOException(throwable); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6980003b6c13..435c3018baee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -645,7 +645,8 @@ public HRegionServer(final Configuration conf) throws IOException { serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); - rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + rpcRetryingCallerFactory = + RpcRetryingCallerFactory.instantiate(this.conf, clusterConnection.getConnectionMetrics()); // login the zookeeper client principal (if using security) ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 21a2d51326ab..cf0f69372d3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -390,8 +390,8 @@ public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection c this.sink = sink; this.connection = connection; this.operationTimeout = operationTimeout; - this.rpcRetryingCallerFactory = - RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); + this.rpcRetryingCallerFactory = RpcRetryingCallerFactory + .instantiate(connection.getConfiguration(), connection.getConnectionMetrics()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.pool = pool; this.tableDescriptors = tableDescriptors; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 5e3f2e9468f4..54adfd22a36b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -871,7 +871,7 @@ protected List tryAtomicRegionLoad(ClientServiceCallable List toRetry = new ArrayList<>(); try { Configuration conf = getConf(); - byte[] region = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() + byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null). newCaller() .callWithRetries(serviceCallable, Integer.MAX_VALUE); if (region == null) { LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 0a49d94068ea..06050af37b5b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -123,7 +123,8 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); AsyncProcess asyncProcess = new AsyncProcess(c, conf, - RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf)); + RpcRetryingCallerFactory.instantiate(conf, c.getConnectionMetrics()), + RpcControllerFactory.instantiate(conf)); Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index da95c6fa7d8b..7d588712c33b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -209,8 +209,8 @@ private void replicateUsingCallable(ClusterConnection connection, Queue e locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), new AtomicLong()); - RpcRetryingCallerFactory factory = - RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); + RpcRetryingCallerFactory factory = RpcRetryingCallerFactory + .instantiate(connection.getConfiguration(), connection.getConnectionMetrics()); factory. newCaller().callWithRetries(callable, 10000); } } From 76c763d18007d528b964eca2795f7aff4a018510 Mon Sep 17 00:00:00 2001 From: Briana Augenreich Date: Mon, 17 Oct 2022 11:10:34 -0400 Subject: [PATCH 6/6] Add null check for connection metrics --- .../hadoop/hbase/client/HTableMultiplexer.java | 4 ++-- .../hbase/client/ScannerCallableWithReplicas.java | 13 ++++++++----- .../hadoop/hbase/regionserver/HRegionServer.java | 4 ++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index a9835cb0b426..bbbc27980485 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -422,8 +422,8 @@ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation a this.addr = addr; this.multiplexer = htableMultiplexer; this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); - RpcRetryingCallerFactory rpcCallerFactory = - RpcRetryingCallerFactory.instantiate(conf, conn.getConnectionMetrics()); + RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, + conn == null ? null : conn.getConnectionMetrics()); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 46977447ed64..27cc4d151264 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -178,8 +178,9 @@ public Result[] call(int timeout) throws IOException { // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService> cs = - new ResultBoundedCompletionService<>(RpcRetryingCallerFactory - .instantiate(ScannerCallableWithReplicas.this.conf, cConnection.getConnectionMetrics()), + new ResultBoundedCompletionService<>( + RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf, + cConnection == null ? null : cConnection.getConnectionMetrics()), pool, regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); @@ -381,9 +382,11 @@ class RetryingRPC implements RetryingCallable>, // and we can't invoke it multiple times at the same time) this.caller = ScannerCallableWithReplicas.this.caller; if (scan.getConsistency() == Consistency.TIMELINE) { - this.caller = RpcRetryingCallerFactory - .instantiate(ScannerCallableWithReplicas.this.conf, cConnection.getConnectionMetrics()) - . newCaller(); + this.caller = + RpcRetryingCallerFactory + .instantiate(ScannerCallableWithReplicas.this.conf, + cConnection == null ? null : cConnection.getConnectionMetrics()) + . newCaller(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 435c3018baee..f6522adaabe0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -645,8 +645,8 @@ public HRegionServer(final Configuration conf) throws IOException { serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); - rpcRetryingCallerFactory = - RpcRetryingCallerFactory.instantiate(this.conf, clusterConnection.getConnectionMetrics()); + rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf, + clusterConnection == null ? null : clusterConnection.getConnectionMetrics()); // login the zookeeper client principal (if using security) ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,