Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,13 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep
continue;
}

limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
long maxRequestsToEstimate = limiter.getRequestNumLimit();
long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit());
long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit());

limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), writeConsumed,
Math.min(maxReadsToEstimate, numReads), readConsumed, writeCapacityUnitConsumed,
readCapacityUnitConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ public long getWriteAvailable() {
throw new UnsupportedOperationException();
}

@Override
public long getRequestNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getReadNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getWriteNumLimit() {
return Long.MAX_VALUE;
}

@Override
public long getReadAvailable() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,14 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,

/** Returns the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable();

/** Returns the maximum number of requests to allow per TimeUnit */
long getRequestNumLimit();

/** Returns the maximum number of reads to allow per TimeUnit */
long getReadNumLimit();

/** Returns the maximum number of writes to allow per TimeUnit */
long getWriteNumLimit();

}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,27 @@ public long getWriteAvailable() {
return writeSizeLimiter.getAvailable();
}

@Override
public long getRequestNumLimit() {
long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit();

if (readAndWriteLimit < 0) { // handle overflow
readAndWriteLimit = Long.MAX_VALUE;
}

return Math.min(reqsLimiter.getLimit(), readAndWriteLimit);
}

@Override
public long getReadNumLimit() {
return readReqsLimiter.getLimit();
}

@Override
public long getWriteNumLimit() {
return writeReqsLimiter.getLimit();
}

@Override
public long getReadAvailable() {
return readSizeLimiter.getAvailable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
package org.apache.hadoop.hbase.quotas;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;

@Category({ RegionServerTests.class, SmallTests.class })
public class TestDefaultOperationQuota {
@ClassRule
Expand Down Expand Up @@ -125,4 +130,67 @@ public void testScanEstimateShrinkingWorkload() {
// shrinking workload should only shrink estimate to maxBBS
assertEquals(maxBlockBytesScanned, estimate);
}

@Test
public void testLargeBatchSaturatesReadNumLimit()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use the whole limit
quota.checkBatchQuota(0, limit);

// the next request should be rejected
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

Thread.sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's preferable not to sleep. Instead usually you can influence the timestamp by adding 1s to it or whatever.

The reason not to sleep is that you're more subject to random Jenkins slowdowns or other problems. We do it in a lot of places of course, but iirc the quota stuff already can do some timestamp manipulation?

// after the TimeUnit, the limit should be refilled
quota.checkBatchQuota(0, limit);
}

@Test
public void testTooLargeReadBatchIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use more than the limit, which should succeed rather than being indefinitely blocked
quota.checkBatchQuota(0, 10 + limit);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));

Thread.sleep(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit));
}

@Test
public void testTooLargeWriteBatchIsNotBlocked()
throws RpcThrottlingException, InterruptedException {
int limit = 10;
QuotaProtos.Throttle throttle =
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);

// use more than the limit, which should succeed rather than being indefinitely blocked
quota.checkBatchQuota(10 + limit, 0);

// the next request should be blocked
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));

Thread.sleep(1000);
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
}
}