From 1ffa127be8abf2b4ab0be1a6ebb148a539e7a73f Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 19 Jun 2024 08:38:53 -0400 Subject: [PATCH 1/4] HBASE-28672 ensure large batches are not indefinitely blocked by quotas --- .../hbase/quotas/DefaultOperationQuota.java | 9 ++- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 15 ++++ .../hadoop/hbase/quotas/QuotaLimiter.java | 10 +++ .../hadoop/hbase/quotas/TimeBasedLimiter.java | 21 ++++++ .../quotas/TestDefaultOperationQuota.java | 68 +++++++++++++++++++ 5 files changed, 121 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 2e26765a6a19..41f8f5f4918f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -111,8 +111,13 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep continue; } - limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); + long maxRequestsToEstimate = limiter.getRequestNumLimit(); + long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit()); + long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit()); + + limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), writeConsumed, + Math.min(maxReadsToEstimate, numReads), readConsumed, writeCapacityUnitConsumed, + readCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index cf1e49c12e5c..659f08b9fbcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -65,6 +65,21 @@ public long getWriteAvailable() { throw new UnsupportedOperationException(); } + @Override + public long getRequestNumLimit() { + return Long.MAX_VALUE; + } + + @Override + public long getReadNumLimit() { + return Long.MAX_VALUE; + } + + @Override + public long getWriteNumLimit() { + return Long.MAX_VALUE; + } + @Override public long getReadAvailable() { throw new UnsupportedOperationException(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 8d00a702e253..d215e6a832ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -81,4 +81,14 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, /** Returns the number of bytes available to write to avoid exceeding the quota */ long getWriteAvailable(); + + /** Returns the maximum number of requests to allow per TimeUnit */ + long getRequestNumLimit(); + + /** Returns the maximum number of reads to allow per TimeUnit */ + long getReadNumLimit(); + + /** Returns the maximum number of writes to allow per TimeUnit */ + long getWriteNumLimit(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index e6e143343f72..f0296e0752cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -240,6 +240,27 @@ public long getWriteAvailable() { return writeSizeLimiter.getAvailable(); } + @Override + public long getRequestNumLimit() { + long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit(); + + if (readAndWriteLimit < 0) { // handle overflow + readAndWriteLimit = Long.MAX_VALUE; + } + + return Math.min(reqsLimiter.getLimit(), readAndWriteLimit); + } + + @Override + public long getReadNumLimit() { + return readReqsLimiter.getLimit(); + } + + @Override + public long getWriteNumLimit() { + return writeReqsLimiter.getLimit(); + } + @Override public long getReadAvailable() { return readSizeLimiter.getAvailable(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index 4684be02d69d..e6345b94a203 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -27,6 +29,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; + @Category({ RegionServerTests.class, SmallTests.class }) public class TestDefaultOperationQuota { @ClassRule @@ -125,4 +130,67 @@ public void testScanEstimateShrinkingWorkload() { // shrinking workload should only shrink estimate to maxBBS assertEquals(maxBlockBytesScanned, estimate); } + + @Test + public void testLargeBatchSaturatesReadNumLimit() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use the whole limit + quota.checkBatchQuota(0, limit); + + // the next request should be rejected + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + Thread.sleep(1000); + // after the TimeUnit, the limit should be refilled + quota.checkBatchQuota(0, limit); + } + + @Test + public void testTooLargeReadBatchIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use more than the limit, which should succeed rather than being indefinitely blocked + quota.checkBatchQuota(0, 10 + limit); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + Thread.sleep(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit)); + } + + @Test + public void testTooLargeWriteBatchIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use more than the limit, which should succeed rather than being indefinitely blocked + quota.checkBatchQuota(10 + limit, 0); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + Thread.sleep(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + } } From 89feac8195fbe7dd1052fb70d727b99acb9fc7f9 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 21 Jun 2024 11:01:44 -0400 Subject: [PATCH 2/4] speed up TestDefaultOperationQuota --- .../hbase/quotas/TestDefaultOperationQuota.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index e6345b94a203..a59d3f720932 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -25,6 +25,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -38,6 +41,14 @@ public class TestDefaultOperationQuota { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); + private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); + static { + envEdge.setValue(EnvironmentEdgeManager.currentTime()); + // only active the envEdge for quotas package + EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, + ThrottleQuotaTestUtil.class.getPackage().getName()); + } + @Test public void testScanEstimateNewScanner() { long blockSize = 64 * 1024; @@ -147,7 +158,7 @@ public void testLargeBatchSaturatesReadNumLimit() // the next request should be rejected assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); - Thread.sleep(1000); + envEdge.incValue(1000); // after the TimeUnit, the limit should be refilled quota.checkBatchQuota(0, limit); } @@ -168,7 +179,7 @@ public void testTooLargeReadBatchIsNotBlocked() // the next request should be blocked assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); - Thread.sleep(1000); + envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit)); } @@ -189,7 +200,7 @@ public void testTooLargeWriteBatchIsNotBlocked() // the next request should be blocked assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); - Thread.sleep(1000); + envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); } From c5cccfc5dae5b1d7c4737a8f723530baf9f49509 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Mon, 24 Jun 2024 12:12:17 -0400 Subject: [PATCH 3/4] Prevent quota blocking based on read, write, or request size estimates --- .../hbase/quotas/DefaultOperationQuota.java | 7 +- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 5 ++ .../hadoop/hbase/quotas/QuotaLimiter.java | 3 + .../hadoop/hbase/quotas/TimeBasedLimiter.java | 5 ++ .../quotas/TestDefaultOperationQuota.java | 67 +++++++++++++++++++ 5 files changed, 85 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 41f8f5f4918f..a387c04e4e51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -114,9 +114,12 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep long maxRequestsToEstimate = limiter.getRequestNumLimit(); long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit()); long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit()); + long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit()); + long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit()); - limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), writeConsumed, - Math.min(maxReadsToEstimate, numReads), readConsumed, writeCapacityUnitConsumed, + limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), + Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), + Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, readCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 659f08b9fbcc..5ece0be2b5aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -90,6 +90,11 @@ public long getReadLimit() { return Long.MAX_VALUE; } + @Override + public long getWriteLimit() { + return Long.MAX_VALUE; + } + @Override public String toString() { return "NoopQuotaLimiter"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index d215e6a832ca..12e4c4a7c6a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -79,6 +79,9 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, /** Returns the maximum number of bytes ever available to read */ long getReadLimit(); + /** Returns the maximum number of bytes ever available to write */ + long getWriteLimit(); + /** Returns the number of bytes available to write to avoid exceeding the quota */ long getWriteAvailable(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index f0296e0752cf..f5170b09c83e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -271,6 +271,11 @@ public long getReadLimit() { return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit()); } + @Override + public long getWriteLimit() { + return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit()); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index a59d3f720932..f83224f091b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -204,4 +204,71 @@ public void testTooLargeWriteBatchIsNotBlocked() // even after the TimeUnit, the limit should not be refilled because we oversubscribed assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); } + + @Test + public void testTooLargeWriteSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 50; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked + quota.checkBatchQuota(1, 0); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + } + + @Test + public void testTooLargeReadSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + long blockSize = 65536; + long limit = blockSize / 2; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); + + // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked + quota.checkBatchQuota(0, 1); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + } + + @Test + public void testTooLargeRequestSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + long blockSize = 65536; + long limit = blockSize / 2; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); + + // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked + quota.checkBatchQuota(0, 1); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + } } From 15b58667defe17d1ade6f177079da01d19b7e82b Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 28 Jun 2024 11:54:27 -0400 Subject: [PATCH 4/4] additional test --- .../quotas/TestDefaultOperationQuota.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index f83224f091b8..a6b7ba6fee59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -163,6 +163,27 @@ public void testLargeBatchSaturatesReadNumLimit() quota.checkBatchQuota(0, limit); } + @Test + public void testLargeBatchSaturatesReadWriteLimit() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use the whole limit + quota.checkBatchQuota(limit, 0); + + // the next request should be rejected + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + envEdge.incValue(1000); + // after the TimeUnit, the limit should be refilled + quota.checkBatchQuota(limit, 0); + } + @Test public void testTooLargeReadBatchIsNotBlocked() throws RpcThrottlingException, InterruptedException {