diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7bee885586bc..f8363b87e70f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; @@ -102,10 +104,6 @@ class AsyncBatchRpcRetryingCaller { private final IdentityHashMap> action2Errors; - private final long pauseNs; - - private final long pauseNsForServerOverloaded; - private final int maxAttempts; private final long operationTimeoutNs; @@ -116,6 +114,8 @@ class AsyncBatchRpcRetryingCaller { private final long startNs; + private final HBaseServerExceptionPauseManager pauseManager; + // we can not use HRegionLocation as the map key because the hashCode and equals method of // HRegionLocation only consider serverName. private static final class RegionRequest { @@ -155,8 +155,6 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; - this.pauseNs = pauseNs; - this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -164,6 +162,7 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, this.actions = new ArrayList<>(actions.size()); this.futures = new ArrayList<>(actions.size()); this.action2Future = new IdentityHashMap<>(actions.size()); + this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded); for (int i = 0, n = actions.size(); i < n; i++) { Row rawAction = actions.get(i); Action action; @@ -360,7 +359,7 @@ private void onComplete(Map actionsByRegion, int tries, } }); if (!failedActions.isEmpty()) { - tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false); + tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null); } } @@ -465,18 +464,25 @@ private void onError(Map actionsByRegion, int tries, Thro List copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) .collect(Collectors.toList()); addError(copiedActions, error, serverName); - tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, - HBaseServerException.isServerOverloaded(error)); + tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error); } private void tryResubmit(Stream actions, int tries, boolean immediately, - boolean isServerOverloaded) { + Throwable error) { if (immediately) { groupAndSend(actions, tries); return; } long delayNs; - long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs; + boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error); + OptionalLong maybePauseNsToUse = + pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS); + if (!maybePauseNsToUse.isPresent()) { + failAll(actions, tries); + return; + } + long pauseNsToUse = maybePauseNsToUse.getAsLong(); + if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { @@ -528,7 +534,7 @@ private void groupAndSend(Stream actions, int tries) { sendOrDelay(actionsByServer, tries); } if (!locateFailed.isEmpty()) { - tryResubmit(locateFailed.stream(), tries, false, false); + tryResubmit(locateFailed.stream(), tries, false, null); } }); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index d0bbe4b5fa3b..7ded355cfd9e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -56,10 +58,6 @@ public abstract class AsyncRpcRetryingCaller { private final long startNs; - private final long pauseNs; - - private final long pauseNsForServerOverloaded; - private int tries = 1; private final int maxAttempts; @@ -78,14 +76,14 @@ public abstract class AsyncRpcRetryingCaller { protected final HBaseRpcController controller; + private final HBaseServerExceptionPauseManager pauseManager; + public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.priority = priority; - this.pauseNs = pauseNs; - this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -95,6 +93,7 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr this.controller.setPriority(priority); this.exceptions = new ArrayList<>(); this.startNs = System.nanoTime(); + this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded); } private long elapsedMs() { @@ -125,8 +124,14 @@ protected final void resetCallTimeout() { } private void tryScheduleRetry(Throwable error) { - long pauseNsToUse = - HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs; + OptionalLong maybePauseNsToUse = + pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS); + if (!maybePauseNsToUse.isPresent()) { + completeExceptionally(); + return; + } + long pauseNsToUse = maybePauseNsToUse.getAsLong(); + long delayNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 2653b3c75b3e..949ea07107b6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; +import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; @@ -99,10 +101,6 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final long scannerLeaseTimeoutPeriodNs; - private final long pauseNs; - - private final long pauseNsForServerOverloaded; - private final int maxAttempts; private final long scanTimeoutNs; @@ -131,6 +129,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private long nextCallSeq = -1L; + private final HBaseServerExceptionPauseManager pauseManager; + private enum ScanControllerState { INITIALIZED, SUSPENDED, @@ -330,8 +330,6 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.loc = loc; this.regionServerRemote = isRegionServerRemote; this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; - this.pauseNs = pauseNs; - this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -346,6 +344,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.controller = conn.rpcControllerFactory.newController(); this.controller.setPriority(priority); this.exceptions = new ArrayList<>(); + this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded); } private long elapsedMs() { @@ -419,8 +418,15 @@ private void onError(Throwable error) { return; } long delayNs; - long pauseNsToUse = - HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs; + + OptionalLong maybePauseNsToUse = + pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS); + if (!maybePauseNsToUse.isPresent()) { + completeExceptionally(!scannerClosed); + return; + } + long pauseNsToUse = maybePauseNsToUse.getAsLong(); + if (scanTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 4d88e34ff656..c692a2757d6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; @@ -140,14 +141,28 @@ public T callWithRetries(RetryingCallable callable, int callTimeout) if (tries >= maxAttempts - 1) { throw new RetriesExhaustedException(tries, exceptions); } - // If the server is dead, we need to wait a little before retrying, to give - // a chance to the regions to be moved - // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be - // special when encountering an exception indicating the server is overloaded. - // see #HBASE-17114 and HBASE-26807 - long pauseBase = - HBaseServerException.isServerOverloaded(t) ? pauseForServerOverloaded : pause; - expectedSleep = callable.sleep(pauseBase, tries); + + if (t instanceof RpcThrottlingException) { + RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) t; + expectedSleep = rpcThrottlingException.getWaitInterval(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleep, + rpcThrottlingException); + } + } else { + expectedSleep = + HBaseServerException.isServerOverloaded(t) ? pauseForServerOverloaded : pause; + + // only factor in retry adjustment for non-RpcThrottlingExceptions + // because RpcThrottlingExceptions tell you how long to wait + + // If the server is dead, we need to wait a little before retrying, to give + // a chance to the regions to be moved + // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be + // special when encountering an exception indicating the server is overloaded. + // see #HBASE-17114 and HBASE-26807 + expectedSleep = callable.sleep(expectedSleep, tries); + } // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration(expectedSleep); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java new file mode 100644 index 000000000000..235d1d1d20e4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseServerException; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class HBaseServerExceptionPauseManager { + private static final Logger LOG = LoggerFactory.getLogger(HBaseServerExceptionPauseManager.class); + + private final long pauseNs; + private final long pauseNsForServerOverloaded; + + public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded) { + this.pauseNs = pauseNs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; + } + + public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) { + long expectedSleepNs; + if (error instanceof RpcThrottlingException) { + RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error; + expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval()); + if (expectedSleepNs > remainingTimeNs) { + return OptionalLong.empty(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleepNs, + rpcThrottlingException); + } + } else { + expectedSleepNs = + HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs; + } + return OptionalLong.of(expectedSleepNs); + } + +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java new file mode 100644 index 000000000000..793fa9f6d218 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseServerException; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestHBaseServerExceptionPauseManager { + + private static final long WAIT_INTERVAL_MILLIS = 1L; + private static final long WAIT_INTERVAL_NANOS = + TimeUnit.MILLISECONDS.toNanos(WAIT_INTERVAL_MILLIS); + private static final long PAUSE_NANOS_FOR_SERVER_OVERLOADED = WAIT_INTERVAL_NANOS * 3; + + private static final long PAUSE_NANOS = WAIT_INTERVAL_NANOS * 2; + + private final RpcThrottlingException RPC_THROTTLING_EXCEPTION = new RpcThrottlingException( + RpcThrottlingException.Type.NumRequestsExceeded, WAIT_INTERVAL_MILLIS, "doot"); + private final Throwable OTHER_EXCEPTION = new RuntimeException(""); + private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true); + + private final HBaseServerExceptionPauseManager pauseManager = + new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class); + + @Test + public void itSupportsRpcThrottlingNanos() { + OptionalLong pauseNanos = + pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, Long.MAX_VALUE); + assertTrue(pauseNanos.isPresent()); + assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS); + } + + @Test + public void itSupportsServerOverloadedExceptionNanos() { + OptionalLong pauseNanos = + pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, Long.MAX_VALUE); + assertTrue(pauseNanos.isPresent()); + assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS_FOR_SERVER_OVERLOADED); + } + + @Test + public void itSupportsOtherExceptionNanos() { + OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, Long.MAX_VALUE); + assertTrue(pauseNanos.isPresent()); + assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS); + } + + @Test + public void itThrottledTimeoutFastFail() { + OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 0L); + assertFalse(pauseNanos.isPresent()); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java new file mode 100644 index 000000000000..ab63a9cb3c39 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncClientPauseForRpcThrottling { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncClientPauseForRpcThrottling.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("RpcThrottling"); + + private static byte[] FAMILY = Bytes.toBytes("Family"); + + private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); + + private static AsyncConnection CONN; + private static final AtomicBoolean THROTTLE = new AtomicBoolean(false); + private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1); + + public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices { + + public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) + throws ServiceException { + maybeThrottle(); + return super.get(controller, request); + } + + @Override + public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) + throws ServiceException { + maybeThrottle(); + return super.multi(rpcc, request); + } + + @Override + public ClientProtos.ScanResponse scan(RpcController controller, + ClientProtos.ScanRequest request) throws ServiceException { + maybeThrottle(); + return super.scan(controller, request); + } + + private void maybeThrottle() throws ServiceException { + if (THROTTLE.get()) { + THROTTLE.set(false); + throw new ServiceException(new RpcThrottlingException("number of requests exceeded - wait " + + TimeUnit.NANOSECONDS.toMillis(WAIT_INTERVAL_NANOS) + "ms")); + } + } + } + + public static final class ThrottlingRegionServerForTest extends HRegionServer { + + public ThrottlingRegionServerForTest(Configuration conf) throws IOException { + super(conf); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new ThrottlingRSRpcServicesForTest(this); + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + UTIL.startMiniCluster(1); + UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, + ThrottlingRegionServerForTest.class, HRegionServer.class); + HRegionServer regionServer = UTIL.getMiniHBaseCluster().startRegionServer().getRegionServer(); + + try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { + UTIL.waitTableAvailable(TABLE_NAME); + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); + } + } + + UTIL.getAdmin().move(UTIL.getAdmin().getRegions(TABLE_NAME).get(0).getEncodedNameAsBytes(), + regionServer.getServerName()); + Configuration conf = new Configuration(UTIL.getConfiguration()); + CONN = ConnectionFactory.createAsyncConnection(conf).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.getAdmin().disableTable(TABLE_NAME); + UTIL.getAdmin().deleteTable(TABLE_NAME); + Closeables.close(CONN, true); + UTIL.shutdownMiniCluster(); + } + + private void assertTime(Callable callable, long time, boolean isGreater) throws Exception { + long startNs = System.nanoTime(); + callable.call(); + long costNs = System.nanoTime() - startNs; + if (isGreater) { + assertTrue(costNs > time); + } else { + assertTrue(costNs <= time); + } + } + + @Test + public void itWaitsForThrottledGet() throws Exception { + boolean isThrottled = true; + THROTTLE.set(isThrottled); + AsyncTable table = CONN.getTable(TABLE_NAME); + assertTime(() -> { + table.get(new Get(Bytes.toBytes(0))).get(); + return null; + }, WAIT_INTERVAL_NANOS, isThrottled); + } + + @Test + public void itDoesNotWaitForUnthrottledGet() throws Exception { + boolean isThrottled = false; + THROTTLE.set(isThrottled); + AsyncTable table = CONN.getTable(TABLE_NAME); + assertTime(() -> { + table.get(new Get(Bytes.toBytes(0))).get(); + return null; + }, WAIT_INTERVAL_NANOS, isThrottled); + } + + @Test + public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception { + AsyncTable table = + CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MILLISECONDS).build(); + boolean isThrottled = true; + THROTTLE.set(isThrottled); + assertTime(() -> { + assertThrows(ExecutionException.class, () -> table.get(new Get(Bytes.toBytes(0))).get()); + return null; + }, WAIT_INTERVAL_NANOS, false); + } + + @Test + public void itWaitsForThrottledBatch() throws Exception { + boolean isThrottled = true; + THROTTLE.set(isThrottled); + assertTime(() -> { + List> futures = new ArrayList<>(); + try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { + for (int i = 100; i < 110; i++) { + futures.add(mutator + .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); + } + } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + }, WAIT_INTERVAL_NANOS, isThrottled); + } + + @Test + public void itDoesNotWaitForUnthrottledBatch() throws Exception { + boolean isThrottled = false; + THROTTLE.set(isThrottled); + assertTime(() -> { + List> futures = new ArrayList<>(); + try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { + for (int i = 100; i < 110; i++) { + futures.add(mutator + .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); + } + } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + }, WAIT_INTERVAL_NANOS, isThrottled); + } + + @Test + public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception { + boolean isThrottled = true; + THROTTLE.set(isThrottled); + assertTime(() -> { + List> futures = new ArrayList<>(); + try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME) + .setOperationTimeout(1, TimeUnit.MILLISECONDS).build()) { + for (int i = 100; i < 110; i++) { + futures.add(mutator + .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); + } + } + assertThrows(ExecutionException.class, + () -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get()); + return null; + }, WAIT_INTERVAL_NANOS, false); + } + + @Test + public void itWaitsForThrottledScan() throws Exception { + boolean isThrottled = true; + THROTTLE.set(isThrottled); + assertTime(() -> { + try ( + ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { + for (int i = 0; i < 100; i++) { + Result result = scanner.next(); + assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); + } + } + return null; + }, WAIT_INTERVAL_NANOS, isThrottled); + } + + @Test + public void itDoesNotWaitForUnthrottledScan() throws Exception { + boolean isThrottled = false; + THROTTLE.set(isThrottled); + assertTime(() -> { + try ( + ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { + for (int i = 0; i < 100; i++) { + Result result = scanner.next(); + assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); + } + } + return null; + }, WAIT_INTERVAL_NANOS, isThrottled); + } + + @Test + public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception { + AsyncTable table = + CONN.getTableBuilder(TABLE_NAME).setScanTimeout(1, TimeUnit.MILLISECONDS).build(); + boolean isThrottled = true; + THROTTLE.set(isThrottled); + assertTime(() -> { + try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) { + for (int i = 0; i < 100; i++) { + assertThrows(RetriesExhaustedException.class, scanner::next); + } + } + return null; + }, WAIT_INTERVAL_NANOS, false); + } +}