From 47b18efe088c0c077cd80efbf046fbf3e1c190b8 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Mon, 6 Feb 2023 12:35:04 -0500 Subject: [PATCH 1/3] HBASE-27532 Add block bytes scanned metrics Rename server side counter to blockBytesScanned Add to ScanMetrics as countOfBlockBytesScanned, and incorporate into TestScannerBlockSizeLimits --- .../client/metrics/ServerSideScanMetrics.java | 5 +++ .../hadoop/hbase/ipc/RpcCallContext.java | 13 +++++- .../apache/hadoop/hbase/ipc/RpcServer.java | 2 +- .../apache/hadoop/hbase/ipc/ServerCall.java | 4 +- .../hbase/regionserver/RSRpcServices.java | 10 +++-- .../hbase/regionserver/StoreScanner.java | 2 +- .../namequeues/TestNamedQueueRecorder.java | 4 +- .../region/TestRegionProcedureStore.java | 4 +- .../TestScannerBlockSizeLimits.java | 44 +++++++++++++------ 9 files changed, 62 insertions(+), 26 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java index c705463b62c5..ca2e0d1b6e04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java @@ -47,6 +47,8 @@ protected AtomicLong createCounter(String counterName) { public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED"; public static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED"; + public static final String BLOCK_BYTES_SCANNED_KEY_METRIC_NAME = "BLOCK_BYTES_SCANNED"; + /** * number of rows filtered during scan RPC */ @@ -59,6 +61,9 @@ protected AtomicLong createCounter(String counterName) { */ public final AtomicLong countOfRowsScanned = createCounter(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); + public final AtomicLong countOfBlockBytesScanned = + createCounter(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME); + public void setCounter(String counterName, long value) { AtomicLong c = this.counters.get(counterName); if (c != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 479a83f914a4..4f299b4a85d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -87,9 +87,18 @@ default Optional getRequestUserName() { */ void incrementResponseCellSize(long cellSize); - long getResponseBlockSize(); + /** + * Get the number of block bytes scanned by the current call. In order to serve a response, 1 or + * more lower level blocks must be loaded (from disk or cache) and scanned for the requested + * cells. This value includes the total block size for each block loaded for the request. + */ + long getBlockBytesScanned(); - void incrementResponseBlockSize(long blockSize); + /** + * Increment the number of block bytes scanned by the current call. See + * {@link #getBlockBytesScanned()} for details. + */ + void incrementBlockBytesScanned(long blockSize); long getResponseExceptionSize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 2780b8fcfb71..19b5327844a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -428,7 +428,7 @@ public Pair call(RpcCall call, MonitoredRPCHandler status) // Use the raw request call size for now. long requestSize = call.getSize(); long responseSize = result.getSerializedSize(); - long responseBlockSize = call.getResponseBlockSize(); + long responseBlockSize = call.getBlockBytesScanned(); if (call.isClientCellBlockSupported()) { // Include the payload size in HBaseRpcController responseSize += call.getResponseCellSize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index bdd3593cf2d5..2188795914db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -424,12 +424,12 @@ public void incrementResponseCellSize(long cellSize) { } @Override - public long getResponseBlockSize() { + public long getBlockBytesScanned() { return responseBlockSize; } @Override - public void incrementResponseBlockSize(long blockSize) { + public void incrementBlockBytesScanned(long blockSize) { responseBlockSize += blockSize; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 1a6da6fd5575..30434226223a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -718,7 +718,7 @@ private List doNonAtomicRegionMutation(final HRegion region, if ( context != null && context.isRetryImmediatelySupported() && (context.getResponseCellSize() > maxQuotaResultSize - || context.getResponseBlockSize() + context.getResponseExceptionSize() + || context.getBlockBytesScanned() + context.getResponseExceptionSize() > maxQuotaResultSize) ) { @@ -730,7 +730,7 @@ private List doNonAtomicRegionMutation(final HRegion region, // // Instead just create the exception and then store it. sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " - + context.getResponseCellSize() + " BlockSize: " + context.getResponseBlockSize()); + + context.getResponseCellSize() + " BlockSize: " + context.getBlockBytesScanned()); // Only report the exception once since there's only one request that // caused the exception. Otherwise this number will dominate the exceptions count. @@ -3313,7 +3313,7 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan long maxCellSize = maxResultSize; long maxBlockSize = maxQuotaResultSize; if (rpcCall != null) { - maxBlockSize -= rpcCall.getResponseBlockSize(); + maxBlockSize -= rpcCall.getBlockBytesScanned(); maxCellSize -= rpcCall.getResponseCellSize(); } @@ -3431,6 +3431,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan // Check to see if the client requested that we track metrics server side. If the // client requested metrics, retrieve the metrics from the scanner context. if (trackMetrics) { + // rather than increment yet another counter in StoreScanner, just set the value here + // from block size progress before writing into the response + scannerContext.getMetrics().countOfBlockBytesScanned + .set(scannerContext.getBlockSizeProgress()); Map metrics = scannerContext.getMetrics().getMetricsMap(); ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 949cb9f54b55..2b6ac583ff66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -621,7 +621,7 @@ public boolean next(List outResult, ScannerContext scannerContext) throws heap.recordBlockSize(blockSize -> { if (rpcCall.isPresent()) { - rpcCall.get().incrementResponseBlockSize(blockSize); + rpcCall.get().incrementBlockBytesScanned(blockSize); } scannerContext.incrementBlockProgress(blockSize); }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 84e12d026ae1..5d2b1c6e561c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -669,12 +669,12 @@ public void incrementResponseCellSize(long cellSize) { } @Override - public long getResponseBlockSize() { + public long getBlockBytesScanned() { return 0; } @Override - public void incrementResponseBlockSize(long blockSize) { + public void incrementBlockBytesScanned(long blockSize) { } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index 76c84cef9a3a..d26870b77dfd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -287,12 +287,12 @@ public void incrementResponseCellSize(long cellSize) { } @Override - public long getResponseBlockSize() { + public long getBlockBytesScanned() { return 0; } @Override - public void incrementResponseBlockSize(long blockSize) { + public void incrementBlockBytesScanned(long blockSize) { } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java index 97a964696096..c70c7a35133a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.filter.SkipFilter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -69,11 +70,8 @@ public class TestScannerBlockSizeLimits { private static final byte[] COLUMN1 = Bytes.toBytes(0); private static final byte[] COLUMN2 = Bytes.toBytes(1); private static final byte[] COLUMN3 = Bytes.toBytes(2); - private static final byte[] COLUMN4 = Bytes.toBytes(4); private static final byte[] COLUMN5 = Bytes.toBytes(5); - private static final byte[][] COLUMNS = new byte[][] { COLUMN1, COLUMN2 }; - @BeforeClass public static void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -83,6 +81,15 @@ public static void setUp() throws Exception { createTestData(); } + @Before + public void setupEach() throws Exception { + HRegionServer regionServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + for (HRegion region : regionServer.getRegions(TABLE)) { + System.out.println("Clearing cache for region " + region.getRegionInfo().getEncodedName()); + regionServer.clearRegionBlockCache(region); + } + } + private static void createTestData() throws IOException, InterruptedException { RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE); String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); @@ -91,12 +98,13 @@ private static void createTestData() throws IOException, InterruptedException { for (int i = 1; i < 10; i++) { // 5 columns per row, in 2 families // Each column value is 1000 bytes, which is enough to fill a full block with row and header. - // So 5 blocks per row. + // So 5 blocks per row in FAMILY1 Put put = new Put(Bytes.toBytes(i)); for (int j = 0; j < 6; j++) { put.addColumn(FAMILY1, Bytes.toBytes(j), DATA); } + // Additional block in FAMILY2 (notably smaller than block size) put.addColumn(FAMILY2, COLUMN1, DATA); region.put(put); @@ -128,6 +136,8 @@ public void testSingleBlock() throws IOException { scanner.next(100); + // we fetch 2 columns from 1 row, so about 2 blocks + assertEquals(4120, metrics.countOfBlockBytesScanned.get()); assertEquals(1, metrics.countOfRowsScanned.get()); assertEquals(1, metrics.countOfRPCcalls.get()); } @@ -150,6 +160,8 @@ public void testCheckLimitAfterFilterRowKey() throws IOException { .addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2) .setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2))))); + ScanMetrics metrics = scanner.getScanMetrics(); + boolean foundRow3 = false; for (Result result : scanner) { Set rows = new HashSet<>(); @@ -163,10 +175,11 @@ public void testCheckLimitAfterFilterRowKey() throws IOException { foundRow3 = true; } } - ScanMetrics metrics = scanner.getScanMetrics(); - // 4 blocks per row, so 36 blocks. We can scan 3 blocks per RPC, which is 12 RPCs. But we can - // skip 1 row, so skip 2 RPCs. + // 22 blocks, last one is 1030 bytes (approx 3 per row for 8 rows, but some compaction happens + // in family2 since each row only has 1 cell there and 2 can fit per block) + assertEquals(44290, metrics.countOfBlockBytesScanned.get()); + // We can return 22 blocks in 9 RPCs, but need an extra one to check for more rows at end assertEquals(10, metrics.countOfRPCcalls.get()); } @@ -192,6 +205,7 @@ public void testCheckLimitAfterFilteringRowCellsDueToFilterRow() throws IOExcept ScanMetrics metrics = scanner.getScanMetrics(); // scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total + assertEquals(18540, metrics.countOfBlockBytesScanned.get()); // limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd) // so that's 3 RPC and the last RPC pulls the cells loaded by the last block assertEquals(4, metrics.countOfRPCcalls.get()); @@ -216,9 +230,11 @@ public void testCheckLimitAfterFilteringCell() throws IOException { } } ScanMetrics metrics = scanner.getScanMetrics(); + System.out.println(metrics.countOfBlockBytesScanned.get()); - // We will return 9 rows, but also 2 cursors because we exceed the scan size limit partway - // through. So that accounts for 11 rpcs. + // 9 rows, total of 32 blocks (last one is 1030) + assertEquals(64890, metrics.countOfBlockBytesScanned.get()); + // We can return 32 blocks in approx 11 RPCs but we need 2 cursors due to the narrow filter assertEquals(2, cursors); assertEquals(11, metrics.countOfRPCcalls.get()); } @@ -247,8 +263,9 @@ public void testCheckLimitAfterFilteringRowCells() throws IOException { ScanMetrics metrics = scanner.getScanMetrics(); // Our filter causes us to read the first column of each row, then INCLUDE_AND_SEEK_NEXT_ROW. - // So we load 1 block per row, and there are 9 rows. Our max scan size is large enough to - // return 2 full blocks, with some overflow. So we are able to squeeze this all into 4 RPCs. + // So we load 1 block per row, and there are 9 rows. So 9 blocks + assertEquals(18540, metrics.countOfBlockBytesScanned.get()); + // We can return 9 blocks in 3 RPCs, but need 1 more to check for more results (returns 0) assertEquals(4, metrics.countOfRPCcalls.get()); } @@ -266,8 +283,9 @@ public void testSeekNextUsingHint() throws IOException { ScanMetrics metrics = scanner.getScanMetrics(); // We have to read the first cell/block of each row, then can skip to the last block. So that's - // 2 blocks per row to read (18 total). Our max scan size is enough to read 3 blocks per RPC, - // plus one final RPC to finish region. + // 2 blocks per row to read (18 blocks total) + assertEquals(37080, metrics.countOfBlockBytesScanned.get()); + // Our max scan size is enough to read 3 blocks per RPC, plus one final RPC to finish region. assertEquals(7, metrics.countOfRPCcalls.get()); } From 6dc20c7fe96f560e1664b9c6ec3527f2feb7ad46 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Wed, 8 Feb 2023 11:41:50 -0500 Subject: [PATCH 2/3] Add overall counter and per-request histogram to table and regionserver metrics in jmx --- .../MetricsRegionServerSource.java | 41 +++++---- .../MetricsRegionServerSourceImpl.java | 57 ++++++++++--- .../hbase/regionserver/MetricsUserSource.java | 10 ++- .../regionserver/MetricsUserSourceImpl.java | 33 ++++++-- .../regionserver/MetricsRegionServer.java | 53 ++++++------ .../regionserver/MetricsUserAggregate.java | 11 ++- .../MetricsUserAggregateFactory.java | 13 ++- .../MetricsUserAggregateImpl.java | 25 ++++-- .../hbase/regionserver/RSRpcServices.java | 70 ++++++++++------ .../metrics/MetricsTableRequests.java | 84 +++++++++++++------ .../regionserver/TestMetricsRegionServer.java | 19 +++-- .../TestMetricsTableRequests.java | 10 ++- .../TestMetricsUserAggregate.java | 31 +++---- 13 files changed, 293 insertions(+), 164 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index f89263a9b32d..120e4655eeda 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -85,27 +85,32 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo /** * Update checkAndMutate histogram - * @param t time it took + * @param time time it took + * @param blockBytesScanned how many block bytes were scanned for the check portion of the request */ - void updateCheckAndMutate(long t); + void updateCheckAndMutate(long time, long blockBytesScanned); /** * Update the Get time histogram . - * @param t time it took + * @param time time it took + * @param blockBytesScanned how many block bytes were scanned for the request */ - void updateGet(long t); + void updateGet(long time, long blockBytesScanned); /** * Update the Increment time histogram. - * @param t time it took + * @param time time it took + * @param blockBytesScanned how many block bytes were scanned fetching the current value to + * increment */ - void updateIncrement(long t); + void updateIncrement(long time, long blockBytesScanned); /** * Update the Append time histogram. - * @param t time it took + * @param time time it took + * @param blockBytesScanned how many block bytes were scanned fetching the current value to append */ - void updateAppend(long t); + void updateAppend(long time, long blockBytesScanned); /** * Update the Replay time histogram. @@ -114,15 +119,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo void updateReplay(long t); /** - * Update the scan size. - * @param scanSize size of the scan - */ - void updateScanSize(long scanSize); - - /** - * Update the scan time. + * Update the scan metrics. + * @param time response time of scan + * @param responseCellSize size of the scan resposne + * @param blockBytesScanned size of block bytes scanned to retrieve the response */ - void updateScanTime(long t); + void updateScan(long time, long responseCellSize, long blockBytesScanned); /** * Increment the number of slow Puts that have happened. @@ -445,6 +447,13 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo String SCAN_SIZE_KEY = "scanSize"; String SCAN_TIME_KEY = "scanTime"; + String BLOCK_BYTES_SCANNED_KEY = "blockBytesScannedCount"; + String BLOCK_BYTES_SCANNED_DESC = "Count of block bytes scanned by read requests"; + String GET_BLOCK_BYTES_SCANNED_KEY = "getBlockBytesScanned"; + String SCAN_BLOCK_BYTES_SCANNED_KEY = "scanBlockBytesScanned"; + String CHECK_AND_MUTATE_BLOCK_BYTES_SCANNED_KEY = "checkAndMutateBlockBytesScanned"; + String INCREMENT_BLOCK_BYTES_SCANNED_KEY = "incrementBlockBytesScanned"; + String APPEND_BLOCK_BYTES_SCANNED_KEY = "appendBlockBytesScanned"; String SLOW_PUT_KEY = "slowPutCount"; String SLOW_GET_KEY = "slowGetCount"; String SLOW_DELETE_KEY = "slowDeleteCount"; diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index bbae0587eb83..ee5ce34f6b60 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -48,6 +48,13 @@ public class MetricsRegionServerSourceImpl extends BaseSourceImpl private final MetricHistogram scanSizeHisto; private final MetricHistogram scanTimeHisto; + private final MutableFastCounter blockBytesScannedCount; + private final MetricHistogram checkAndMutateBlockBytesScanned; + private final MetricHistogram getBlockBytesScanned; + private final MetricHistogram incrementBlockBytesScanned; + private final MetricHistogram appendBlockBytesScanned; + private final MetricHistogram scanBlockBytesScanned; + private final MutableFastCounter slowPut; private final MutableFastCounter slowDelete; private final MutableFastCounter slowGet; @@ -125,6 +132,16 @@ public MetricsRegionServerSourceImpl(String metricsName, String metricsDescripti scanSizeHisto = getMetricsRegistry().newSizeHistogram(SCAN_SIZE_KEY); scanTimeHisto = getMetricsRegistry().newTimeHistogram(SCAN_TIME_KEY); + blockBytesScannedCount = + getMetricsRegistry().newCounter(BLOCK_BYTES_SCANNED_KEY, BLOCK_BYTES_SCANNED_DESC, 0L); + checkAndMutateBlockBytesScanned = + getMetricsRegistry().newSizeHistogram(CHECK_AND_MUTATE_BLOCK_BYTES_SCANNED_KEY); + getBlockBytesScanned = getMetricsRegistry().newSizeHistogram(GET_BLOCK_BYTES_SCANNED_KEY); + incrementBlockBytesScanned = + getMetricsRegistry().newSizeHistogram(INCREMENT_BLOCK_BYTES_SCANNED_KEY); + appendBlockBytesScanned = getMetricsRegistry().newSizeHistogram(APPEND_BLOCK_BYTES_SCANNED_KEY); + scanBlockBytesScanned = getMetricsRegistry().newSizeHistogram(SCAN_BLOCK_BYTES_SCANNED_KEY); + flushTimeHisto = getMetricsRegistry().newTimeHistogram(FLUSH_TIME, FLUSH_TIME_DESC); flushMemstoreSizeHisto = getMetricsRegistry().newSizeHistogram(FLUSH_MEMSTORE_SIZE, FLUSH_MEMSTORE_SIZE_DESC); @@ -192,18 +209,30 @@ public void updateDelete(long t) { } @Override - public void updateGet(long t) { + public void updateGet(long t, long blockBytesScanned) { getHisto.add(t); + if (blockBytesScanned > 0) { + blockBytesScannedCount.incr(blockBytesScanned); + getBlockBytesScanned.add(blockBytesScanned); + } } @Override - public void updateIncrement(long t) { + public void updateIncrement(long t, long blockBytesScanned) { incrementHisto.add(t); + if (blockBytesScanned > 0) { + blockBytesScannedCount.incr(blockBytesScanned); + incrementBlockBytesScanned.add(blockBytesScanned); + } } @Override - public void updateAppend(long t) { + public void updateAppend(long t, long blockBytesScanned) { appendHisto.add(t); + if (blockBytesScanned > 0) { + blockBytesScannedCount.incr(blockBytesScanned); + appendBlockBytesScanned.add(blockBytesScanned); + } } @Override @@ -212,13 +241,13 @@ public void updateReplay(long t) { } @Override - public void updateScanSize(long scanSize) { - scanSizeHisto.add(scanSize); - } - - @Override - public void updateScanTime(long t) { - scanTimeHisto.add(t); + public void updateScan(long time, long responseSize, long blockBytesScanned) { + scanTimeHisto.add(time); + scanSizeHisto.add(responseSize); + if (blockBytesScanned > 0) { + blockBytesScannedCount.incr(blockBytesScanned); + scanBlockBytesScanned.add(blockBytesScanned); + } } @Override @@ -646,8 +675,12 @@ public void updateCheckAndPut(long t) { } @Override - public void updateCheckAndMutate(long t) { - checkAndMutateHisto.add(t); + public void updateCheckAndMutate(long time, long blockBytesScanned) { + checkAndMutateHisto.add(time); + if (blockBytesScanned > 0) { + blockBytesScannedCount.incr(blockBytesScanned); + checkAndMutateBlockBytesScanned.add(blockBytesScanned); + } } @Override diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java index 2d75c9246ba2..feb173a94afd 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java @@ -51,15 +51,17 @@ interface ClientMetrics { void updateDelete(long t); - void updateGet(long t); + void updateGet(long time, long blockBytesScanned); - void updateIncrement(long t); + void updateIncrement(long time, long blockBytesScanned); - void updateAppend(long t); + void updateAppend(long time, long blockBytesScanned); void updateReplay(long t); - void updateScanTime(long t); + void updateScan(long time, long blockBytesScanned); + + void updateCheckAndMutate(long blockBytesScanned); void getMetrics(MetricsCollector metricsCollector, boolean all); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java index 95cb36239546..7985967c0473 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource { private final String userIncrementKey; private final String userAppendKey; private final String userReplayKey; + private final String userBlockBytesScannedKey; private MetricHistogram getHisto; private MetricHistogram scanTimeHisto; @@ -53,6 +55,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource { private MetricHistogram incrementHisto; private MetricHistogram appendHisto; private MetricHistogram replayHisto; + private MutableFastCounter blockBytesScannedCount; private final int hashCode; @@ -116,7 +119,7 @@ public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) { this.user = user; this.registry = agg.getMetricsRegistry(); - this.userNamePrefix = "user_" + user + "_metric_"; + this.userNamePrefix = "User_" + user + "_metric_"; hashCode = userNamePrefix.hashCode(); @@ -127,6 +130,7 @@ public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) { userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY; userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY; userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY; + userBlockBytesScannedKey = userNamePrefix + MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY; clientMetricsMap = new ConcurrentHashMap<>(); agg.register(this); } @@ -141,6 +145,7 @@ public void register() { incrementHisto = registry.newTimeHistogram(userIncrementKey); appendHisto = registry.newTimeHistogram(userAppendKey); replayHisto = registry.newTimeHistogram(userReplayKey); + blockBytesScannedCount = registry.newCounter(userBlockBytesScannedKey, "", 0); } } @@ -165,6 +170,7 @@ public void deregister() { registry.removeMetric(userIncrementKey); registry.removeMetric(userAppendKey); registry.removeMetric(userReplayKey); + registry.removeMetric(userBlockBytesScannedKey); } } @@ -231,18 +237,21 @@ public void updateDelete(long t) { } @Override - public void updateGet(long t) { - getHisto.add(t); + public void updateGet(long time, long blockBytesScanned) { + getHisto.add(time); + blockBytesScannedCount.incr(blockBytesScanned); } @Override - public void updateIncrement(long t) { - incrementHisto.add(t); + public void updateIncrement(long time, long blockBytesScanned) { + incrementHisto.add(time); + blockBytesScannedCount.incr(blockBytesScanned); } @Override - public void updateAppend(long t) { - appendHisto.add(t); + public void updateAppend(long time, long blockBytesScanned) { + appendHisto.add(time); + blockBytesScannedCount.incr(blockBytesScanned); } @Override @@ -251,8 +260,14 @@ public void updateReplay(long t) { } @Override - public void updateScanTime(long t) { - scanTimeHisto.add(t); + public void updateScan(long time, long blockBytesScanned) { + scanTimeHisto.add(time); + blockBytesScannedCount.incr(blockBytesScanned); + } + + @Override + public void updateCheckAndMutate(long blockBytesScanned) { + blockBytesScannedCount.incr(blockBytesScanned); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index f07ed38493ee..8afb3f31e39c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -150,44 +150,45 @@ public void updateCheckAndPut(HRegion region, long t) { serverSource.updateCheckAndPut(t); } - public void updateCheckAndMutate(HRegion region, long t) { + public void updateCheckAndMutate(HRegion region, long time, long blockBytesScanned) { if (region.getMetricsTableRequests() != null) { - region.getMetricsTableRequests().updateCheckAndMutate(t); + region.getMetricsTableRequests().updateCheckAndMutate(time, blockBytesScanned); } - serverSource.updateCheckAndMutate(t); + serverSource.updateCheckAndMutate(time, blockBytesScanned); + userAggregate.updateCheckAndMutate(blockBytesScanned); } - public void updateGet(HRegion region, long t) { + public void updateGet(HRegion region, long time, long blockBytesScanned) { if (region.getMetricsTableRequests() != null) { - region.getMetricsTableRequests().updateGet(t); + region.getMetricsTableRequests().updateGet(time, blockBytesScanned); } - if (t > slowMetricTime) { + if (time > slowMetricTime) { serverSource.incrSlowGet(); } - serverSource.updateGet(t); - userAggregate.updateGet(t); + serverSource.updateGet(time, blockBytesScanned); + userAggregate.updateGet(time, blockBytesScanned); } - public void updateIncrement(HRegion region, long t) { + public void updateIncrement(HRegion region, long time, long blockBytesScanned) { if (region.getMetricsTableRequests() != null) { - region.getMetricsTableRequests().updateIncrement(t); + region.getMetricsTableRequests().updateIncrement(time, blockBytesScanned); } - if (t > slowMetricTime) { + if (time > slowMetricTime) { serverSource.incrSlowIncrement(); } - serverSource.updateIncrement(t); - userAggregate.updateIncrement(t); + serverSource.updateIncrement(time, blockBytesScanned); + userAggregate.updateIncrement(time, blockBytesScanned); } - public void updateAppend(HRegion region, long t) { + public void updateAppend(HRegion region, long time, long blockBytesScanned) { if (region.getMetricsTableRequests() != null) { - region.getMetricsTableRequests().updateAppend(t); + region.getMetricsTableRequests().updateAppend(time, blockBytesScanned); } - if (t > slowMetricTime) { + if (time > slowMetricTime) { serverSource.incrSlowAppend(); } - serverSource.updateAppend(t); - userAggregate.updateAppend(t); + serverSource.updateAppend(time, blockBytesScanned); + userAggregate.updateAppend(time, blockBytesScanned); } public void updateReplay(long t) { @@ -195,19 +196,12 @@ public void updateReplay(long t) { userAggregate.updateReplay(t); } - public void updateScanSize(HRegion region, long scanSize) { - if (region.getMetricsTableRequests() != null) { - region.getMetricsTableRequests().updateScanSize(scanSize); - } - serverSource.updateScanSize(scanSize); - } - - public void updateScanTime(HRegion region, long t) { + public void updateScan(HRegion region, long time, long responseCellSize, long blockBytesScanned) { if (region.getMetricsTableRequests() != null) { - region.getMetricsTableRequests().updateScanTime(t); + region.getMetricsTableRequests().updateScan(time, responseCellSize, blockBytesScanned); } - serverSource.updateScanTime(t); - userAggregate.updateScanTime(t); + serverSource.updateScan(time, responseCellSize, blockBytesScanned); + userAggregate.updateScan(time, blockBytesScanned); } public void updateSplitTime(long t) { @@ -300,4 +294,5 @@ public void updateWriteQueryMeter(HRegion region) { public void incrScannerLeaseExpired() { serverSource.incrScannerLeaseExpired(); } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java index 14de79da67cf..c673efdae350 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java @@ -29,17 +29,20 @@ public interface MetricsUserAggregate { void updateDelete(long t); - void updateGet(long t); + void updateGet(long time, long blockBytesScanned); - void updateIncrement(long t); + void updateIncrement(long time, long blockBytesScanned); - void updateAppend(long t); + void updateAppend(long time, long blockBytesScanned); void updateReplay(long t); - void updateScanTime(long t); + void updateScan(long time, long blockBytesScanned); + + void updateCheckAndMutate(long blockBytesScanned); void updateFilteredReadRequests(); void updateReadRequestCount(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java index 2d5c94c1a665..2b566a708d92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java @@ -51,17 +51,17 @@ public void updateDelete(long t) { } @Override - public void updateGet(long t) { + public void updateGet(long time, long blockBytesScanned) { } @Override - public void updateIncrement(long t) { + public void updateIncrement(long time, long blockBytesScanned) { } @Override - public void updateAppend(long t) { + public void updateAppend(long time, long blockBytesScanned) { } @@ -71,7 +71,12 @@ public void updateReplay(long t) { } @Override - public void updateScanTime(long t) { + public void updateScan(long time, long blockBytesScanned) { + + } + + @Override + public void updateCheckAndMutate(long blockBytesScanned) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java index 963cd6ee03a8..4856105f4e8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java @@ -114,30 +114,30 @@ public void updateDelete(long t) { } @Override - public void updateGet(long t) { + public void updateGet(long time, long blockBytesScanned) { String user = getActiveUser(); if (user != null) { MetricsUserSource userSource = getOrCreateMetricsUser(user); - userSource.updateGet(t); + userSource.updateGet(time, blockBytesScanned); } } @Override - public void updateIncrement(long t) { + public void updateIncrement(long time, long blockBytesScanned) { String user = getActiveUser(); if (user != null) { MetricsUserSource userSource = getOrCreateMetricsUser(user); - userSource.updateIncrement(t); + userSource.updateIncrement(time, blockBytesScanned); incrementClientWriteMetrics(userSource); } } @Override - public void updateAppend(long t) { + public void updateAppend(long time, long blockBytesScanned) { String user = getActiveUser(); if (user != null) { MetricsUserSource userSource = getOrCreateMetricsUser(user); - userSource.updateAppend(t); + userSource.updateAppend(time, blockBytesScanned); incrementClientWriteMetrics(userSource); } } @@ -153,11 +153,20 @@ public void updateReplay(long t) { } @Override - public void updateScanTime(long t) { + public void updateScan(long time, long blockBytesScanned) { String user = getActiveUser(); if (user != null) { MetricsUserSource userSource = getOrCreateMetricsUser(user); - userSource.updateScanTime(t); + userSource.updateScan(time, blockBytesScanned); + } + } + + @Override + public void updateCheckAndMutate(long blockBytesScanned) { + String user = getActiveUser(); + if (user != null) { + MetricsUserSource userSource = getOrCreateMetricsUser(user); + userSource.updateCheckAndMutate(blockBytesScanned); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 30434226223a..896b8e7180ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -650,16 +650,20 @@ private CheckAndMutateResult checkAndMutate(HRegion region, List doNonAtomicRegionMutation(final HRegion region, resultOrExceptionBuilder.clear(); try { Result r = null; - + long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; if ( context != null && context.isRetryImmediatelySupported() && (context.getResponseCellSize() > maxQuotaResultSize - || context.getBlockBytesScanned() + context.getResponseExceptionSize() - > maxQuotaResultSize) + || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize) ) { // We're storing the exception since the exception and reason string won't @@ -730,7 +737,7 @@ private List doNonAtomicRegionMutation(final HRegion region, // // Instead just create the exception and then store it. sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: " - + context.getResponseCellSize() + " BlockSize: " + context.getBlockBytesScanned()); + + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore); // Only report the exception once since there's only one request that // caused the exception. Otherwise this number will dominate the exceptions count. @@ -771,7 +778,10 @@ private List doNonAtomicRegionMutation(final HRegion region, } finally { final MetricsRegionServer metricsRegionServer = server.getMetrics(); if (metricsRegionServer != null) { - metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before); + long blockBytesScanned = + context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; + metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, + blockBytesScanned); } } } else if (action.hasServiceCall()) { @@ -797,11 +807,11 @@ private List doNonAtomicRegionMutation(final HRegion region, switch (type) { case APPEND: r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, - spaceQuotaEnforcement); + spaceQuotaEnforcement, context); break; case INCREMENT: r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, - spaceQuotaEnforcement); + spaceQuotaEnforcement, context); break; case PUT: case DELETE: @@ -2425,6 +2435,7 @@ public GetResponse get(final RpcController controller, final GetRequest request) long before = EnvironmentEdgeManager.currentTime(); OperationQuota quota = null; HRegion region = null; + RpcCallContext context = RpcServer.getCurrentCall().orElse(null); try { checkOpen(); requestCount.increment(); @@ -2445,7 +2456,6 @@ public GetResponse get(final RpcController controller, final GetRequest request) } Boolean existence = null; Result r = null; - RpcCallContext context = RpcServer.getCurrentCall().orElse(null); quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); Get clientGet = ProtobufUtil.toGet(get); @@ -2495,11 +2505,10 @@ public GetResponse get(final RpcController controller, final GetRequest request) throw new ServiceException(ie); } finally { final MetricsRegionServer metricsRegionServer = server.getMetrics(); - if (metricsRegionServer != null) { - TableDescriptor td = region != null ? region.getTableDescriptor() : null; - if (td != null) { - metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before); - } + if (metricsRegionServer != null && region != null) { + long blockBytesScanned = context != null ? context.getBlockBytesScanned() : 0; + metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before, + blockBytesScanned); } if (quota != null) { quota.close(); @@ -2751,7 +2760,7 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) if (regionAction.getActionCount() == 1) { CheckAndMutateResult result = checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner, - regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement); + regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context); regionActionResultBuilder.setProcessed(result.isSuccess()); resultOrExceptionOrBuilder.setIndex(0); if (result.getResult() != null) { @@ -2916,7 +2925,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque if (request.hasCondition()) { CheckAndMutateResult result = checkAndMutate(region, quota, mutation, cellScanner, - request.getCondition(), nonceGroup, spaceQuotaEnforcement); + request.getCondition(), nonceGroup, spaceQuotaEnforcement, context); builder.setProcessed(result.isSuccess()); boolean clientCellBlockSupported = isClientCellBlockSupport(context); addResult(builder, result.getResult(), controller, clientCellBlockSupported); @@ -2930,11 +2939,13 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque switch (type) { case APPEND: // TODO: this doesn't actually check anything. - r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); + r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, + context); break; case INCREMENT: // TODO: this doesn't actually check anything. - r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); + r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, + context); break; case PUT: put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); @@ -3001,8 +3012,9 @@ private void delete(HRegion region, OperationQuota quota, MutationProto mutation private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota, MutationProto mutation, CellScanner cellScanner, Condition condition, long nonceGroup, - ActivePolicyEnforcement spaceQuota) throws IOException { + ActivePolicyEnforcement spaceQuota, RpcCallContext context) throws IOException { long before = EnvironmentEdgeManager.currentTime(); + long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0; CheckAndMutate checkAndMutate = ProtobufUtil.toCheckAndMutate(condition, mutation, cellScanner); long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; checkCellSizeLimit(region, (Mutation) checkAndMutate.getAction()); @@ -3022,7 +3034,9 @@ private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota MetricsRegionServer metricsRegionServer = server.getMetrics(); if (metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); - metricsRegionServer.updateCheckAndMutate(region, after - before); + long blockBytesScanned = + context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0; + metricsRegionServer.updateCheckAndMutate(region, after - before, blockBytesScanned); MutationType type = mutation.getMutateType(); switch (type) { @@ -3452,12 +3466,16 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan region.closeRegionOperation(); // Update serverside metrics, even on error. long end = EnvironmentEdgeManager.currentTime(); - long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0; + long responseCellSize = 0; + long blockBytesScanned = 0; + if (rpcCall != null) { + responseCellSize = rpcCall.getResponseCellSize(); + blockBytesScanned = rpcCall.getBlockBytesScanned(); + } region.getMetrics().updateScanTime(end - before); final MetricsRegionServer metricsRegionServer = server.getMetrics(); if (metricsRegionServer != null) { - metricsRegionServer.updateScanSize(region, responseCellSize); - metricsRegionServer.updateScanTime(region, end - before); + metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned); metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/MetricsTableRequests.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/MetricsTableRequests.java index 5adea96269dd..357a379357fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/MetricsTableRequests.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/MetricsTableRequests.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.metrics.Counter; import org.apache.hadoop.hbase.metrics.Histogram; import org.apache.hadoop.hbase.metrics.Meter; import org.apache.hadoop.hbase.metrics.MetricRegistries; @@ -72,6 +73,12 @@ public class MetricsTableRequests { private final static String CHECK_AND_DELETE_TIME = "checkAndDeleteTime"; private final static String CHECK_AND_PUT_TIME = "checkAndPutTime"; private final static String CHECK_AND_MUTATE_TIME = "checkAndMutateTime"; + String BLOCK_BYTES_SCANNED_KEY = "blockBytesScannedCount"; + String GET_BLOCK_BYTES_SCANNED_KEY = "getBlockBytesScanned"; + String SCAN_BLOCK_BYTES_SCANNED_KEY = "scanBlockBytesScanned"; + String CHECK_AND_MUTATE_BLOCK_BYTES_SCANNED_KEY = "checkAndMutateBlockBytesScanned"; + String INCREMENT_BLOCK_BYTES_SCANNED_KEY = "incrementBlockBytesScanned"; + String APPEND_BLOCK_BYTES_SCANNED_KEY = "appendBlockBytesScanned"; private final static String TABLE_READ_QUERY_PER_SECOND = "tableReadQueryPerSecond"; private final static String TABLE_WRITE_QUERY_PER_SECOND = "tableWriteQueryPerSecond"; @@ -87,6 +94,12 @@ public class MetricsTableRequests { private Histogram checkAndDeleteTimeHistogram; private Histogram checkAndPutTimeHistogram; private Histogram checkAndMutateTimeHistogram; + private Counter blockBytesScannedCount; + private Histogram checkAndMutateBlockBytesScanned; + private Histogram getBlockBytesScanned; + private Histogram incrementBlockBytesScanned; + private Histogram appendBlockBytesScanned; + private Histogram scanBlockBytesScanned; private Meter readMeter; private Meter writeMeter; @@ -133,6 +146,13 @@ private void init(TableName tableName, Configuration conf) { checkAndDeleteTimeHistogram = registry.histogram(CHECK_AND_DELETE_TIME); checkAndPutTimeHistogram = registry.histogram(CHECK_AND_PUT_TIME); checkAndMutateTimeHistogram = registry.histogram(CHECK_AND_MUTATE_TIME); + blockBytesScannedCount = registry.counter(BLOCK_BYTES_SCANNED_KEY); + checkAndMutateBlockBytesScanned = + registry.histogram(CHECK_AND_MUTATE_BLOCK_BYTES_SCANNED_KEY); + getBlockBytesScanned = registry.histogram(GET_BLOCK_BYTES_SCANNED_KEY); + incrementBlockBytesScanned = registry.histogram(INCREMENT_BLOCK_BYTES_SCANNED_KEY); + appendBlockBytesScanned = registry.histogram(APPEND_BLOCK_BYTES_SCANNED_KEY); + scanBlockBytesScanned = registry.histogram(SCAN_BLOCK_BYTES_SCANNED_KEY); } if (enabTableQueryMeterMetrics) { @@ -208,51 +228,63 @@ public void updateDeleteBatch(long t) { /** * Update the Get time histogram . - * @param t time it took + * @param time time it took + * @param blockBytesScanned size of block bytes scanned to retrieve the response */ - public void updateGet(long t) { + public void updateGet(long time, long blockBytesScanned) { if (isEnableTableLatenciesMetrics()) { - getTimeHistogram.update(t); + getTimeHistogram.update(time); + if (blockBytesScanned > 0) { + blockBytesScannedCount.increment(blockBytesScanned); + getBlockBytesScanned.update(blockBytesScanned); + } } } /** * Update the Increment time histogram. - * @param t time it took + * @param time time it took + * @param blockBytesScanned size of block bytes scanned to retrieve the response */ - public void updateIncrement(long t) { + public void updateIncrement(long time, long blockBytesScanned) { if (isEnableTableLatenciesMetrics()) { - incrementTimeHistogram.update(t); + incrementTimeHistogram.update(time); + if (blockBytesScanned > 0) { + blockBytesScannedCount.increment(blockBytesScanned); + incrementBlockBytesScanned.update(blockBytesScanned); + } } } /** * Update the Append time histogram. - * @param t time it took + * @param time time it took + * @param blockBytesScanned size of block bytes scanned to retrieve the response */ - public void updateAppend(long t) { + public void updateAppend(long time, long blockBytesScanned) { if (isEnableTableLatenciesMetrics()) { - appendTimeHistogram.update(t); - } - } - - /** - * Update the scan size. - * @param scanSize size of the scan - */ - public void updateScanSize(long scanSize) { - if (isEnableTableLatenciesMetrics()) { - scanSizeHistogram.update(scanSize); + appendTimeHistogram.update(time); + if (blockBytesScanned > 0) { + blockBytesScannedCount.increment(blockBytesScanned); + appendBlockBytesScanned.update(blockBytesScanned); + } } } /** - * Update the scan time. - * @param t time it took + * Update the scan metrics. + * @param time response time of scan + * @param responseCellSize size of the scan resposne + * @param blockBytesScanned size of block bytes scanned to retrieve the response */ - public void updateScanTime(long t) { + public void updateScan(long time, long responseCellSize, long blockBytesScanned) { if (isEnableTableLatenciesMetrics()) { - scanTimeHistogram.update(t); + scanTimeHistogram.update(time); + scanSizeHistogram.update(responseCellSize); + if (blockBytesScanned > 0) { + blockBytesScannedCount.increment(blockBytesScanned); + scanBlockBytesScanned.update(blockBytesScanned); + } } } @@ -280,9 +312,13 @@ public void updateCheckAndPut(long time) { * Update the CheckAndMutate time histogram. * @param time time it took */ - public void updateCheckAndMutate(long time) { + public void updateCheckAndMutate(long time, long blockBytesScanned) { if (isEnableTableLatenciesMetrics()) { checkAndMutateTimeHistogram.update(time); + if (blockBytesScanned > 0) { + blockBytesScannedCount.increment(blockBytesScanned); + checkAndMutateBlockBytesScanned.update(blockBytesScanned); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java index 5feac1696c9d..49ce16c87f98 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionServer.java @@ -154,20 +154,20 @@ public void testSlowCount() { when(metricsTableRequests.isEnableTableLatenciesMetrics()).thenReturn(false); when(metricsTableRequests.isEnabTableQueryMeterMetrics()).thenReturn(false); for (int i = 0; i < 12; i++) { - rsm.updateAppend(region, 12); - rsm.updateAppend(region, 1002); + rsm.updateAppend(region, 12, 120); + rsm.updateAppend(region, 1002, 10020); } for (int i = 0; i < 13; i++) { rsm.updateDeleteBatch(region, 13); rsm.updateDeleteBatch(region, 1003); } for (int i = 0; i < 14; i++) { - rsm.updateGet(region, 14); - rsm.updateGet(region, 1004); + rsm.updateGet(region, 14, 140); + rsm.updateGet(region, 1004, 10040); } for (int i = 0; i < 15; i++) { - rsm.updateIncrement(region, 15); - rsm.updateIncrement(region, 1005); + rsm.updateIncrement(region, 15, 150); + rsm.updateIncrement(region, 1005, 10050); } for (int i = 0; i < 16; i++) { rsm.updatePutBatch(region, 16); @@ -181,19 +181,24 @@ public void testSlowCount() { rsm.updateDelete(region, 1003); rsm.updateCheckAndDelete(region, 17); rsm.updateCheckAndPut(region, 17); - rsm.updateCheckAndMutate(region, 17); + rsm.updateCheckAndMutate(region, 17, 170); } + HELPER.assertCounter("blockBytesScannedCount", 420090, serverSource); HELPER.assertCounter("appendNumOps", 24, serverSource); + HELPER.assertCounter("appendBlockBytesScannedNumOps", 24, serverSource); HELPER.assertCounter("deleteBatchNumOps", 26, serverSource); HELPER.assertCounter("getNumOps", 28, serverSource); + HELPER.assertCounter("getBlockBytesScannedNumOps", 28, serverSource); HELPER.assertCounter("incrementNumOps", 30, serverSource); + HELPER.assertCounter("incrementBlockBytesScannedNumOps", 30, serverSource); HELPER.assertCounter("putBatchNumOps", 32, serverSource); HELPER.assertCounter("putNumOps", 34, serverSource); HELPER.assertCounter("deleteNumOps", 34, serverSource); HELPER.assertCounter("checkAndDeleteNumOps", 17, serverSource); HELPER.assertCounter("checkAndPutNumOps", 17, serverSource); HELPER.assertCounter("checkAndMutateNumOps", 17, serverSource); + HELPER.assertCounter("checkAndMutateBlockBytesScannedNumOps", 17, serverSource); HELPER.assertCounter("slowAppendCount", 12, serverSource); HELPER.assertCounter("slowDeleteCount", 17, serverSource); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableRequests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableRequests.java index 15d4884ba324..8c47c96f1009 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableRequests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableRequests.java @@ -64,20 +64,26 @@ public void testMetricsTableLatencies() { Optional registry2 = MetricRegistries.global().get(info2); assertTrue(registry2.isPresent()); - requests1.updateGet(500L); + requests1.updateGet(500L, 5000L); Snapshot latencies1SnapshotGet = ((HistogramImpl) registry1.get().get("getTime").get()).snapshot(); assertEquals(500, latencies1SnapshotGet.get999thPercentile()); + Snapshot blockBytesScanned1SnapshotGet = + ((HistogramImpl) registry1.get().get("getBlockBytesScanned").get()).snapshot(); + assertEquals(5000, blockBytesScanned1SnapshotGet.get999thPercentile()); requests1.updatePut(50L); Snapshot latencies1SnapshotPut = ((HistogramImpl) registry1.get().get("putTime").get()).snapshot(); assertEquals(50, latencies1SnapshotPut.get99thPercentile()); - requests2.updateGet(300L); + requests2.updateGet(300L, 3000L); Snapshot latencies2SnapshotGet = ((HistogramImpl) registry2.get().get("getTime").get()).snapshot(); assertEquals(300, latencies2SnapshotGet.get999thPercentile()); + Snapshot blockBytesScanned2SnapshotGet = + ((HistogramImpl) registry2.get().get("getBlockBytesScanned").get()).snapshot(); + assertEquals(3000, blockBytesScanned2SnapshotGet.get999thPercentile()); requests2.updatePut(75L); Snapshot latencies2SnapshotPut = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java index abb11ee18738..8e1d126bd5cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java @@ -63,6 +63,7 @@ public static void classSetUp() { public void setUp() { wrapper = new MetricsRegionServerWrapperStub(); Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF, true); rsm = new MetricsRegionServer(wrapper, conf, null); userAgg = (MetricsUserAggregate) rsm.getMetricsUserAggregate(); } @@ -74,10 +75,10 @@ private void doOperations() { when(metricsTableRequests.isEnableTableLatenciesMetrics()).thenReturn(false); when(metricsTableRequests.isEnabTableQueryMeterMetrics()).thenReturn(false); for (int i = 0; i < 10; i++) { - rsm.updateGet(region, 10); + rsm.updateGet(region, 10, 10); } for (int i = 0; i < 11; i++) { - rsm.updateScanTime(region, 11); + rsm.updateScan(region, 11, 111, 1111); } for (int i = 0; i < 12; i++) { rsm.updatePut(region, 12); @@ -86,10 +87,10 @@ private void doOperations() { rsm.updateDelete(region, 13); } for (int i = 0; i < 14; i++) { - rsm.updateIncrement(region, 14); + rsm.updateIncrement(region, 14, 140); } for (int i = 0; i < 15; i++) { - rsm.updateAppend(region, 15); + rsm.updateAppend(region, 15, 150); } for (int i = 0; i < 16; i++) { rsm.updateReplay(16); @@ -99,13 +100,6 @@ private void doOperations() { @Test public void testPerUserOperations() { Configuration conf = HBaseConfiguration.create(); - // If metrics for users is not enabled, this test doesn't make sense. - if ( - !conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF, - MetricsUserAggregateFactory.DEFAULT_METRIC_USER_ENABLED_CONF) - ) { - return; - } User userFoo = User.createUserForTesting(conf, "FOO", new String[0]); User userBar = User.createUserForTesting(conf, "BAR", new String[0]); @@ -132,6 +126,7 @@ public Void run() { HELPER.assertCounter("userfoometricincrementnumops", 14, userAgg.getSource()); HELPER.assertCounter("userfoometricappendnumops", 15, userAgg.getSource()); HELPER.assertCounter("userfoometricreplaynumops", 16, userAgg.getSource()); + HELPER.assertCounter("userfoometricblockbytesscannedcount", 16531, userAgg.getSource()); HELPER.assertCounter("userbarmetricgetnumops", 10, userAgg.getSource()); HELPER.assertCounter("userbarmetricscantimenumops", 11, userAgg.getSource()); @@ -140,18 +135,12 @@ public Void run() { HELPER.assertCounter("userbarmetricincrementnumops", 14, userAgg.getSource()); HELPER.assertCounter("userbarmetricappendnumops", 15, userAgg.getSource()); HELPER.assertCounter("userbarmetricreplaynumops", 16, userAgg.getSource()); + HELPER.assertCounter("userbarmetricblockbytesscannedcount", 16531, userAgg.getSource()); } @Test public void testLossyCountingOfUserMetrics() { Configuration conf = HBaseConfiguration.create(); - // If metrics for users is not enabled, this test doesn't make sense. - if ( - !conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF, - MetricsUserAggregateFactory.DEFAULT_METRIC_USER_ENABLED_CONF) - ) { - return; - } int noOfUsers = 10000; for (int i = 1; i <= noOfUsers; i++) { User.createUserForTesting(conf, "FOO" + i, new String[0]).getUGI() @@ -163,7 +152,7 @@ public Void run() { when(region.getMetricsTableRequests()).thenReturn(metricsTableRequests); when(metricsTableRequests.isEnableTableLatenciesMetrics()).thenReturn(false); when(metricsTableRequests.isEnabTableQueryMeterMetrics()).thenReturn(false); - rsm.updateGet(region, 10); + rsm.updateGet(region, 10, 100); return null; } }); @@ -173,7 +162,11 @@ public Void run() { for (int i = 1; i <= noOfUsers / 10; i++) { assertFalse( HELPER.checkCounterExists("userfoo" + i + "metricgetnumops", userAgg.getSource())); + assertFalse(HELPER.checkCounterExists("userfoo" + i + "metricblockbytesscannedcount", + userAgg.getSource())); } HELPER.assertCounter("userfoo" + noOfUsers + "metricgetnumops", 1, userAgg.getSource()); + HELPER.assertCounter("userfoo" + noOfUsers + "metricblockbytesscannedcount", 100, + userAgg.getSource()); } } From 54e6d6a64d656e4a77d0a98c82c3072ced158749 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Sun, 26 Mar 2023 15:31:59 -0400 Subject: [PATCH 3/3] suppress checkstyle warning --- .../hadoop/hbase/client/metrics/ServerSideScanMetrics.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java index ca2e0d1b6e04..cf730501be0a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java @@ -28,6 +28,7 @@ * Provides server side metrics related to scan operations. */ @InterfaceAudience.Public +@SuppressWarnings("checkstyle:VisibilityModifier") // See HBASE-27757 public class ServerSideScanMetrics { /** * Hash to hold the String -> Atomic Long mappings for each metric