Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,16 @@ public class DefaultOperationQuota implements OperationQuota {
protected long readDiff = 0;
protected long writeCapacityUnitDiff = 0;
protected long readCapacityUnitDiff = 0;
private long blockBytesScanned = 0;
private boolean useBlockBytesScanned;
private long blockSizeBytes;

public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
final QuotaLimiter... limiters) {
this(conf, Arrays.asList(limiters));
this.useBlockBytesScanned =
conf.getBoolean(OperationQuota.USE_BLOCK_BYTES_SCANNED_KEY, USE_BLOCK_BYTES_SCANNED_DEFAULT);
this.blockSizeBytes = blockSizeBytes;
}

/**
Expand Down Expand Up @@ -94,8 +101,15 @@ public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThro
public void close() {
// Adjust the quota consumed for the specified operation
writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
readDiff = operationSize[OperationType.GET.ordinal()]
+ operationSize[OperationType.SCAN.ordinal()] - readConsumed;

if (useBlockBytesScanned) {
readDiff = blockBytesScanned - readConsumed;
} else {
long resultSize =
operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()];
readDiff = resultSize - readConsumed;
}

writeCapacityUnitDiff =
calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
readCapacityUnitDiff = calculateReadCapacityUnitDiff(
Expand Down Expand Up @@ -132,16 +146,28 @@ public void addMutation(final Mutation mutation) {
operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
}

@Override
public void addBlockBytesScanned(long blockBytesScanned) {
this.blockBytesScanned += blockBytesScanned;
}

/**
* Update estimate quota(read/write size/capacityUnits) which will be consumed
* @param numWrites the number of write requests
* @param numReads the number of read requests
* @param numScans the number of scan requests
*/
protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
if (useBlockBytesScanned) {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nit, but can you move the writeConsumed out of the if blocks so we don't duplicate?

// assume 1 block required for reads. this is probably a low estimate, which is okay
readConsumed = numReads > 0 ? blockSizeBytes : 0;
readConsumed += numScans > 0 ? blockSizeBytes : 0;
} else {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
}

writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public class ExceedOperationQuota extends DefaultOperationQuota {
private static final Logger LOG = LoggerFactory.getLogger(ExceedOperationQuota.class);
private QuotaLimiter regionServerLimiter;

public ExceedOperationQuota(final Configuration conf, QuotaLimiter regionServerLimiter,
final QuotaLimiter... limiters) {
super(conf, limiters);
public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) {
super(conf, blockSizeBytes, limiters);
this.regionServerLimiter = regionServerLimiter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public void addMutation(final Mutation mutation) {
// no-op
}

@Override
public void addBlockBytesScanned(long blockBytesScanned) {
// no-op
}

@Override
public long getReadAvailable() {
return Long.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public enum OperationType {
SCAN
}

String USE_BLOCK_BYTES_SCANNED_KEY = "hbase.quota.use.block.bytes.scanned";
boolean USE_BLOCK_BYTES_SCANNED_DEFAULT = false;

/**
* Checks if it is possible to execute the specified operation. The quota will be estimated based
* on the number of operations to perform and the average size accumulated during time.
Expand Down Expand Up @@ -67,6 +70,13 @@ public enum OperationType {
*/
void addMutation(Mutation mutation);

/**
* Add the block bytes scanned for the given call. This may be used to calculate the exact quota,
* and can be a better representation of workload than result sizes. Set
* {@link #USE_BLOCK_BYTES_SCANNED_KEY} to true to prefer this metric over result size.
*/
void addBlockBytesScanned(long blockBytesScanned);

/** Returns the number of bytes available to read to avoid exceeding the quota */
long getReadAvailable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.Region;
Expand Down Expand Up @@ -113,7 +114,8 @@ QuotaCache getQuotaCache() {
* @param table the table where the operation will be executed
* @return the OperationQuota
*/
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table,
final int blockSizeBytes) {
if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
Expand All @@ -123,7 +125,8 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
}
if (!useNoop) {
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
userLimiter);
}
} else {
QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
Expand All @@ -139,11 +142,11 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
}
if (!useNoop) {
if (exceedThrottleQuotaEnabled) {
return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter,
userLimiter, tableLimiter, nsLimiter);
return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
rsLimiter, userLimiter, tableLimiter, nsLimiter);
} else {
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
tableLimiter, nsLimiter, rsLimiter);
return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
userLimiter, tableLimiter, nsLimiter, rsLimiter);
}
}
}
Expand Down Expand Up @@ -213,9 +216,10 @@ private OperationQuota checkQuota(final Region region, final int numWrites, fina
} else {
ugi = User.getCurrent().getUGI();
}
TableName table = region.getTableDescriptor().getTableName();
TableDescriptor tableDescriptor = region.getTableDescriptor();
TableName table = tableDescriptor.getTableName();

OperationQuota quota = getQuota(ugi, table);
OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
try {
quota.checkQuota(numWrites, numReads, numScans);
} catch (RpcThrottlingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ public MetricsTableRequests getMetricsTableRequests() {

private final CellComparator cellComparator;

private final int minBlockSizeBytes;

/**
* @return The smallest mvcc readPoint across all the scanners in this region. Writes older than
* this readPoint, are included in every read operation.
Expand Down Expand Up @@ -916,6 +918,9 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co
.remove(getRegionInfo().getEncodedName());
}
}

minBlockSizeBytes = Arrays.stream(this.htableDescriptor.getColumnFamilies())
.mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE);
}

private void setHTableSpecificConf() {
Expand Down Expand Up @@ -2047,6 +2052,11 @@ public Configuration getReadOnlyConfiguration() {
return new ReadOnlyConfiguration(this.conf);
}

@Override
public int getMinBlockSizeBytes() {
return minBlockSizeBytes;
}

private ThreadPoolExecutor getStoreOpenAndCloseThreadPool(final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount());
int maxThreads = Math.min(numStores, conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ private Result increment(final HRegion region, final OperationQuota quota,
if (metricsRegionServer != null) {
long blockBytesScanned =
context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
quota.addBlockBytesScanned(blockBytesScanned);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may not be a huge issue, but i think this shouldn't be within the metricsRegionServer != null check. Maybe move this and the above line up outside

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you missed adding this to append

metricsRegionServer.updateIncrement(region, EnvironmentEdgeManager.currentTime() - before,
blockBytesScanned);
}
Expand Down Expand Up @@ -2506,6 +2507,9 @@ public GetResponse get(final RpcController controller, final GetRequest request)
if (r != null && r.rawCells() != null) {
quota.addGetResult(r);
}
if (context != null) {
quota.addBlockBytesScanned(context.getBlockBytesScanned());
}
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
Expand Down Expand Up @@ -2841,6 +2845,9 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
spaceQuotaEnforcement);
}
} finally {
if (context != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this works out so well, because of how some of the methods are abstracted. For example, increment can be called from the single mutate method and also from doNonAtomicRegionMutation (which is called from multi). So for increments, we'd double account them.

I might recommend only adding calls to addBlockBytesScanned wherever there are existing calls to getBlockBytesScanned() for metrics. As I mentioned in another comment, you'll want to move the getBlockBytesScanned() outside the if (metrics != null) checks, but otherwise those are probably the best places to add the quota calls if possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually maybe it's better to keep this here and to instead handle avoiding double counting append/increment, i.e. by passing in a isMulti boolean or something

quota.addBlockBytesScanned(context.getBlockBytesScanned());
}
quota.close();
}

Expand Down Expand Up @@ -3041,6 +3048,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota
long after = EnvironmentEdgeManager.currentTime();
long blockBytesScanned =
context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
quota.addBlockBytesScanned(blockBytesScanned);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned);

MutationType type = mutation.getMutateType();
Expand Down Expand Up @@ -3645,6 +3653,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}

quota.addScanResult(results);
if (rpcCall != null) {
quota.addBlockBytesScanned(rpcCall.getBlockBytesScanned());
}
addResults(builder, results, (HBaseRpcController) controller,
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
isClientCellBlockSupport(rpcCall));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,4 +571,10 @@ void requestCompaction(byte[] family, String why, int priority, boolean major,
* if you try to set a configuration.
*/
Configuration getReadOnlyConfiguration();

/**
* The minimum block size configuration from all relevant column families. This is used when
* estimating quota consumption.
*/
int getMinBlockSizeBytes();
}
Loading