From 32ed4c2922360f9c5eb3a6375c9b295b81470381 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Tue, 1 Mar 2022 21:22:22 -0500 Subject: [PATCH 1/9] HBASE-26784 Use HIGH_QOS for ResultScanner.close requests --- .../hbase/client/AsyncClientScanner.java | 1 + ...syncScanSingleRegionRpcRetryingCaller.java | 3 +- .../client/TestAsyncTableRpcPriority.java | 140 +++++++++++++----- .../client/AbstractTestAsyncTableScan.java | 38 +++-- .../hbase/client/TestAsyncTableScan.java | 68 ++++++++- .../hbase/client/TestAsyncTableScanAll.java | 7 +- .../hbase/client/TestAsyncTableScanner.java | 11 +- .../hbase/client/TestRawAsyncTableScan.java | 11 +- 8 files changed, 224 insertions(+), 55 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 5fd00a5f6ed2..48f004c0a29c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -188,6 +188,7 @@ private void startScan(OpenScannerResponse resp) { private CompletableFuture openScanner(int replicaId) { return conn.callerFactory. single().table(tableName) .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) + .priority(scan.getPriority()) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 1fa3c81e5d1d..7f19180a0ab2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; @@ -347,7 +348,7 @@ private long remainingTimeNs() { private void closeScanner() { incRPCCallsMetrics(scanMetrics, regionServerRemote); - resetController(controller, rpcTimeoutNs, priority); + resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS); ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { if (controller.failed()) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index 8c22946649dc..68f5a47bef4b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -34,6 +38,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -91,6 +98,8 @@ public class TestAsyncTableRpcPriority { private ClientService.Interface stub; + private ExecutorService threadPool; + private AsyncConnection conn; @Rule @@ -98,34 +107,9 @@ public class TestAsyncTableRpcPriority { @Before public void setUp() throws IOException { + this.threadPool = Executors.newSingleThreadExecutor(); stub = mock(ClientService.Interface.class); - AtomicInteger scanNextCalled = new AtomicInteger(0); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - ScanRequest req = invocation.getArgument(1); - RpcCallback done = invocation.getArgument(2); - if (!req.hasScannerId()) { - done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) - .setMoreResultsInRegion(true).setMoreResults(true).build()); - } else { - if (req.hasCloseScanner() && req.getCloseScanner()) { - done.run(ScanResponse.getDefaultInstance()); - } else { - Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put) - .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) - .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) - .setValue(Bytes.toBytes("v")).build(); - Result result = Result.create(Arrays.asList(cell)); - done.run( - ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true) - .setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build()); - } - } - return null; - } - }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); doAnswer(new Answer() { @Override @@ -218,6 +202,16 @@ public boolean matches(HBaseRpcController controller) { }); } + private ScanRequest assertScannerCloseRequest() { + return argThat(new ArgumentMatcher() { + + @Override + public boolean matches(ScanRequest request) { + return request.hasCloseScanner() && request.getCloseScanner(); + } + }); + } + @Test public void testGet() { conn.getTable(TableName.valueOf(name.getMethodName())) @@ -478,53 +472,123 @@ public void testCheckAndMutateMetaTable() throws IOException { any(ClientProtos.MultiRequest.class), any()); } + private void mockScan(int scanPriority) { + int scannerId = 1; + AtomicInteger scanNextCalled = new AtomicInteger(0); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.submit(() ->{ + ScanRequest req = invocation.getArgument(1); + RpcCallback done = invocation.getArgument(2); + if (!req.hasScannerId()) { + done.run( + ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).build()); + } else { + assertFalse("close scanner should not come in with scan priority " + scanPriority, + req.hasCloseScanner() && req.getCloseScanner()); + + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + done.run( + ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build()); + } + }); + return null; + } + }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any()); + + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.submit(() ->{ + ScanRequest req = invocation.getArgument(1); + RpcCallback done = invocation.getArgument(2); + assertTrue("close request should have scannerId", req.hasScannerId()); + assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); + assertTrue("close request should have closerScanner set", req.hasCloseScanner() && req.getCloseScanner()); + + done.run(ScanResponse.getDefaultInstance()); + }); + return null; + } + }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + } + @Test public void testScan() throws IOException, InterruptedException { + mockScan(19); try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) { assertNotNull(scanner.next()); Thread.sleep(1000); } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any()); + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking + // open, next, then several renew lease + verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test public void testScanNormalTable() throws IOException, InterruptedException { + mockScan(NORMAL_QOS); try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { assertNotNull(scanner.next()); Thread.sleep(1000); } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any()); + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking + // open, next, then several renew lease + verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test public void testScanSystemTable() throws IOException, InterruptedException { + mockScan(SYSTEMTABLE_QOS); try (ResultScanner scanner = conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { assertNotNull(scanner.next()); Thread.sleep(1000); } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any()); + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking + // open, next, then several renew lease + verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test public void testScanMetaTable() throws IOException, InterruptedException { + mockScan(SYSTEMTABLE_QOS); try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME) .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { assertNotNull(scanner.next()); Thread.sleep(1000); } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any()); + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking + // open, next, then several renew lease + verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index 5831bfcc60ba..21c8be185846 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -130,7 +130,7 @@ protected static List getTableAndScanCreatorParams() { protected abstract Scan createScan(); - protected abstract List doScan(Scan scan) throws Exception; + protected abstract List doScan(Scan scan, int closeAfter) throws Exception; protected final List convertFromBatchResult(List results) { assertTrue(results.size() % 2 == 0); @@ -146,7 +146,7 @@ protected final List convertFromBatchResult(List results) { @Test public void testScanAll() throws Exception { - List results = doScan(createScan()); + List results = doScan(createScan(), -1); // make sure all scanners are closed at RS side TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) .forEach( @@ -170,7 +170,7 @@ private void assertResultEquals(Result result, int i) { @Test public void testReversedScanAll() throws Exception { - List results = doScan(createScan().setReversed(true)); + List results = doScan(createScan().setReversed(true), -1); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); } @@ -179,7 +179,7 @@ public void testReversedScanAll() throws Exception { public void testScanNoStopKey() throws Exception { int start = 345; List results = - doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)))); + doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1); assertEquals(COUNT - start, results.size()); IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); } @@ -188,7 +188,7 @@ public void testScanNoStopKey() throws Exception { public void testReverseScanNoStopKey() throws Exception { int start = 765; List results = doScan( - createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true)); + createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1); assertEquals(start + 1, results.size()); IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); } @@ -196,7 +196,7 @@ public void testReverseScanNoStopKey() throws Exception { @Test public void testScanWrongColumnFamily() throws Exception { try { - doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily"))); + doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1); } catch (Exception e) { assertTrue(e instanceof NoSuchColumnFamilyException || e.getCause() instanceof NoSuchColumnFamilyException); @@ -204,20 +204,28 @@ public void testScanWrongColumnFamily() throws Exception { } private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, - int limit) throws Exception { + int limit) throws Exception { + testScan(start, startInclusive, stop, stopInclusive, limit, -1); + } + + private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, + int limit, int closeAfter) throws Exception { Scan scan = createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); if (limit > 0) { scan.setLimit(limit); } - List results = doScan(scan); + List results = doScan(scan, closeAfter); int actualStart = startInclusive ? start : start + 1; int actualStop = stopInclusive ? stop + 1 : stop; int count = actualStop - actualStart; if (limit > 0) { count = Math.min(count, limit); } + if (closeAfter > 0) { + count = Math.min(count, closeAfter); + } assertEquals(count, results.size()); IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i)); } @@ -230,12 +238,15 @@ private void testReversedScan(int start, boolean startInclusive, int stop, boole if (limit > 0) { scan.setLimit(limit); } - List results = doScan(scan); + List results = doScan(scan, -1); int actualStart = startInclusive ? start : start - 1; int actualStop = stopInclusive ? stop - 1 : stop; int count = actualStart - actualStop; if (limit > 0) { count = Math.min(count, limit); + } + if (scan.getBatch() > 0) { + } assertEquals(count, results.size()); IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); @@ -310,4 +321,13 @@ public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { testReversedScan(765, false, 543, true, 200); testReversedScan(876, false, 654, false, 200); } + + @Test + public void testScanEndingEarly() throws Exception { + testScan(1, true, 998, false, 0, 900); // from first region to last region + testScan(123, true, 234, true, 0, 100); + testScan(234, true, 456, false, 0, 100); + testScan(345, false, 567, true, 0, 100); + testScan(456, false, 678, false, 0, 100); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index 42d2c38376d6..04aaba1deacb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -55,16 +57,74 @@ protected Scan createScan() { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); - SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); - table.scan(scan, consumer); - List results = consumer.getAll(); + List results; + if (closeAfter > 0) { + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } + LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter); + table.scan(scan, consumer); + results = consumer.getAll(); + } else { + SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + table.scan(scan, consumer); + results = consumer.getAll(); + } if (scan.getBatch() > 0) { results = convertFromBatchResult(results); } return results; } + private static class LimitedScanResultConsumer implements ScanResultConsumer { + + private final int limit; + + public LimitedScanResultConsumer(int limit) { + this.limit = limit; + } + + private final List results = new ArrayList<>(); + + private Throwable error; + + private boolean finished = false; + + @Override + public synchronized boolean onNext(Result result) { + results.add(result); + return results.size() < limit; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + finished = true; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized List getAll() throws Exception { + while (!finished) { + wait(); + } + if (error != null) { + Throwables.propagateIfPossible(error, Exception.class); + throw new Exception(error); + } + return results; + } + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java index d9a53952ab8c..96c2d40138ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -60,11 +60,16 @@ protected Scan createScan() { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { List results = getTable.get().scanAll(scan).get(); if (scan.getBatch() > 0) { results = convertFromBatchResult(results); } + // we can't really close the scan early for scanAll, but to keep the assertions + // simple in AbstractTestAsyncTableScan we'll just sublist here instead. + if (closeAfter > 0 && closeAfter < results.size()) { + results = results.subList(0, closeAfter); + } return results; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index f832cfd759a3..2e990f763da0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -62,12 +62,21 @@ protected Scan createScan() { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results = new ArrayList<>(); + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (closeAfter > 0 && scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } try (ResultScanner scanner = table.getScanner(scan)) { for (Result result; (result = scanner.next()) != null;) { results.add(result); + if (closeAfter > 0 && results.size() >= closeAfter) { + break; + } } } if (scan.getBatch() > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 78a54edd24b3..26c201e19865 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -55,12 +55,21 @@ protected Scan createScan() { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer(); ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (closeAfter > 0 && scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } for (Result result; (result = scanConsumer.take()) != null;) { results.add(result); + if (closeAfter > 0 && results.size() >= closeAfter) { + break; + } } if (scan.getBatch() > 0) { results = convertFromBatchResult(results); From 756603cd2e2282022e55f4c1f31c0e897fa1e3d1 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Fri, 4 Mar 2022 09:59:26 -0500 Subject: [PATCH 2/9] Add implementation and test for blocking client --- .../hadoop/hbase/client/ScannerCallable.java | 12 +- .../hbase/client/TestTableRpcPriority.java | 202 ++++++++++++++++++ 2 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 5d7cf705bd6e..e6c758379b0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; @@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.ScannerResetException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -309,8 +311,16 @@ private void close() { incRPCCallsMetrics(scanMetrics, isRegionServerRemote); ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); + HBaseRpcController controller = rpcControllerFactory.newController(); + // pull in the original priority, but then try to set to HIGH. + // it will take whatever is highest. + controller.setPriority(controller.getPriority()); + controller.setPriority(HConstants.HIGH_QOS); + if (controller.hasCallTimeout()) { + controller.setCallTimeout(controller.getCallTimeout()); + } try { - getStub().scan(getRpcController(), request); + getStub().scan(controller, request); } catch (Exception e) { throw ProtobufUtil.handleRemoteException(e); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java new file mode 100644 index 000000000000..b1a4b320c6a1 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java @@ -0,0 +1,202 @@ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; +import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; +import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; +import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test that correct rpc priority is sent to server from blocking Table calls. Currently + * only implements checks for scans, but more could be added here. + */ +@Category({ ClientTests.class, MediumTests.class }) +public class TestTableRpcPriority { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestTableRpcPriority.class); + + @Rule + public TestName name = new TestName(); + + private ClientProtos.ClientService.BlockingInterface stub; + private Connection conn; + + @Before + public void setUp() throws IOException, ServiceException { + stub = mock(ClientProtos.ClientService.BlockingInterface.class); + + Configuration conf = HBaseConfiguration.create(); + ExecutorService executorService = Executors.newCachedThreadPool(); + conn = new ConnectionImplementation(conf, executorService, + UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) { + + @Override + public ClientProtos.ClientService.BlockingInterface getClient(ServerName serverName) + throws IOException { + return stub; + } + + @Override + public RegionLocations relocateRegion(final TableName tableName, final byte[] row, + int replicaId) throws IOException { + return locateRegion(tableName, row, true, false, replicaId); + } + + @Override + public + RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, + boolean retry, int replicaId) throws IOException { + RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); + ServerName serverName = ServerName.valueOf("rs", 16010, 12345); + HRegionLocation loc = new HRegionLocation(info, serverName); + return new RegionLocations(loc); + } + }; + } + + @Test + public void testScan() throws Exception { + mockScan(19); + testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19)); + } + + @Test + public void testScanNormalTable() throws Exception { + mockScan(NORMAL_QOS); + testForTable(TableName.valueOf(name.getMethodName()), Optional.of(NORMAL_QOS)); + } + + @Test + public void testScanSystemTable() throws Exception { + mockScan(SYSTEMTABLE_QOS); + testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), + Optional.empty()); + } + + @Test + public void testScanMetaTable() throws Exception { + mockScan(SYSTEMTABLE_QOS); + testForTable(TableName.META_TABLE_NAME, Optional.empty()); + } + + private void testForTable(TableName tableName, Optional priority) throws Exception { + Scan scan = new Scan().setCaching(1); + priority.ifPresent(scan::setPriority); + + try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) { + assertNotNull(scanner.next()); + assertNotNull(scanner.next()); + } + + // just verify that the calls happened. verification of priority occurred in the mocking + // open, next, then several renew lease + verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class)); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest()); + } + + private void mockScan(int scanPriority) throws ServiceException { + int scannerId = 1; + AtomicInteger scanNextCalled = new AtomicInteger(0); + doAnswer(new Answer() { + + @Override + public ClientProtos.ScanResponse answer(InvocationOnMock invocation) + throws Throwable { + ClientProtos.ScanRequest req = invocation.getArgument(1); + assertFalse("close scanner should not come in with scan priority " + scanPriority, + req.hasCloseScanner() && req.getCloseScanner()); + ClientProtos.ScanResponse.Builder builder = ClientProtos.ScanResponse.newBuilder(); + + if (!req.hasScannerId()) { + builder.setScannerId(scannerId); + } else { + builder.setScannerId(req.getScannerId()); + } + + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) + .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())).setFamily(Bytes.toBytes("cf")) + .setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true) + .addResults(ProtobufUtil.toResult(result)).build(); + } + }).when(stub).scan(assertPriority(scanPriority), any(ClientProtos.ScanRequest.class)); + + doAnswer(new Answer() { + + @Override + public ClientProtos.ScanResponse answer(InvocationOnMock invocation) + throws Throwable { + ClientProtos.ScanRequest req = invocation.getArgument(1); + assertTrue("close request should have scannerId", req.hasScannerId()); + assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); + assertTrue("close request should have closerScanner set", + req.hasCloseScanner() && req.getCloseScanner()); + + return ClientProtos.ScanResponse.getDefaultInstance(); + } + }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest()); + } + + private HBaseRpcController assertPriority(int priority) { + return argThat(new ArgumentMatcher() { + + @Override + public boolean matches(HBaseRpcController controller) { + return controller.getPriority() == priority; + } + }); + } + + private ClientProtos.ScanRequest assertScannerCloseRequest() { + return argThat(new ArgumentMatcher() { + + @Override + public boolean matches(ClientProtos.ScanRequest request) { + return request.hasCloseScanner() && request.getCloseScanner(); + } + }); + } +} From 4ada781bcff68141384de5aadeaefdbbda6ea32d Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Fri, 4 Mar 2022 10:58:08 -0500 Subject: [PATCH 3/9] add license --- .../hbase/client/TestTableRpcPriority.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java index b1a4b320c6a1..afdb03ee4241 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; From 22b41a4b383dffe8f1bcdaeb294e1babd4c68d29 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Fri, 4 Mar 2022 11:06:34 -0500 Subject: [PATCH 4/9] checkstyle --- .../client/TestAsyncTableRpcPriority.java | 67 ++++++++++--------- .../hbase/client/TestTableRpcPriority.java | 4 +- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index 68f5a47bef4b..56fda306aa66 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -29,19 +29,17 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell.Type; @@ -54,21 +52,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -83,6 +66,19 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations. @@ -477,23 +473,29 @@ private void mockScan(int scanPriority) { AtomicInteger scanNextCalled = new AtomicInteger(0); doAnswer(new Answer() { + @SuppressWarnings("FutureReturnValueIgnored") @Override public Void answer(InvocationOnMock invocation) throws Throwable { - threadPool.submit(() ->{ + threadPool.submit(() -> { ScanRequest req = invocation.getArgument(1); RpcCallback done = invocation.getArgument(2); if (!req.hasScannerId()) { done.run( - ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).build()); + ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) + .setMoreResultsInRegion(true).setMoreResults(true).build()); } else { assertFalse("close scanner should not come in with scan priority " + scanPriority, - req.hasCloseScanner() && req.getCloseScanner()); + req.hasCloseScanner() && req.getCloseScanner()); - Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) - .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build(); - Result result = Result.create(Arrays.asList(cell)); - done.run( - ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build()); + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) + .setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) + .setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + done.run( + ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) + .setMoreResultsInRegion(true).setMoreResults(true) + .addResults(ProtobufUtil.toResult(result)).build()); } }); return null; @@ -502,14 +504,17 @@ public Void answer(InvocationOnMock invocation) throws Throwable { doAnswer(new Answer() { + @SuppressWarnings("FutureReturnValueIgnored") @Override public Void answer(InvocationOnMock invocation) throws Throwable { - threadPool.submit(() ->{ + threadPool.submit(() -> { ScanRequest req = invocation.getArgument(1); RpcCallback done = invocation.getArgument(2); assertTrue("close request should have scannerId", req.hasScannerId()); - assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); - assertTrue("close request should have closerScanner set", req.hasCloseScanner() && req.getCloseScanner()); + assertEquals("close request's scannerId should match", scannerId, + req.getScannerId()); + assertTrue("close request should have closerScanner set", + req.hasCloseScanner() && req.getCloseScanner()); done.run(ScanResponse.getDefaultInstance()); }); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java index afdb03ee4241..4fcf65e6016d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java @@ -33,7 +33,6 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -102,8 +101,7 @@ public RegionLocations relocateRegion(final TableName tableName, final byte[] ro } @Override - public - RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, + public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); ServerName serverName = ServerName.valueOf("rs", 16010, 12345); From 2fec464526fe79d1e9c15a763f6cef53fb212d58 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Fri, 4 Mar 2022 11:49:10 -0500 Subject: [PATCH 5/9] clean up tests --- .../client/TestAsyncTableRpcPriority.java | 89 +++++++------------ .../client/AbstractTestAsyncTableScan.java | 3 - 2 files changed, 34 insertions(+), 58 deletions(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index 56fda306aa66..2d174673d071 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -33,10 +33,10 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -468,8 +468,9 @@ public void testCheckAndMutateMetaTable() throws IOException { any(ClientProtos.MultiRequest.class), any()); } - private void mockScan(int scanPriority) { + private CompletableFuture mockScanReturnRenewFuture(int scanPriority) { int scannerId = 1; + CompletableFuture future = new CompletableFuture<>(); AtomicInteger scanNextCalled = new AtomicInteger(0); doAnswer(new Answer() { @@ -484,6 +485,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable { ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800) .setMoreResultsInRegion(true).setMoreResults(true).build()); } else { + if (req.hasRenew() && req.getRenew()) { + future.complete(null); + } + assertFalse("close scanner should not come in with scan priority " + scanPriority, req.hasCloseScanner() && req.getCloseScanner()); @@ -521,78 +526,52 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + return future; } @Test - public void testScan() throws IOException, InterruptedException { - mockScan(19); - try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) { - assertNotNull(scanner.next()); - Thread.sleep(1000); - } - // ensures the close thread has time to finish before asserting - threadPool.shutdown(); - threadPool.awaitTermination(5, TimeUnit.SECONDS); - - // just verify that the calls happened. verification of priority occurred in the mocking - // open, next, then several renew lease - verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); - verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); + public void testScan() throws Exception { + CompletableFuture renewFuture = mockScanReturnRenewFuture(19); + testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19)); } @Test - public void testScanNormalTable() throws IOException, InterruptedException { - mockScan(NORMAL_QOS); - try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { - assertNotNull(scanner.next()); - Thread.sleep(1000); - } - // ensures the close thread has time to finish before asserting - threadPool.shutdown(); - threadPool.awaitTermination(5, TimeUnit.SECONDS); - - // just verify that the calls happened. verification of priority occurred in the mocking - // open, next, then several renew lease - verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); - verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); + public void testScanNormalTable() throws Exception { + CompletableFuture renewFuture = mockScanReturnRenewFuture(NORMAL_QOS); + testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS)); } @Test - public void testScanSystemTable() throws IOException, InterruptedException { - mockScan(SYSTEMTABLE_QOS); - try (ResultScanner scanner = - conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { - assertNotNull(scanner.next()); - Thread.sleep(1000); - } - // ensures the close thread has time to finish before asserting - threadPool.shutdown(); - threadPool.awaitTermination(5, TimeUnit.SECONDS); - - // just verify that the calls happened. verification of priority occurred in the mocking - // open, next, then several renew lease - verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); - verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); + public void testScanSystemTable() throws Exception { + CompletableFuture renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); + testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()), + renewFuture, Optional.empty()); } @Test - public void testScanMetaTable() throws IOException, InterruptedException { - mockScan(SYSTEMTABLE_QOS); - try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { + public void testScanMetaTable() throws Exception { + CompletableFuture renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS); + testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty()); + } + + private void testForTable(TableName tableName, CompletableFuture renewFuture, Optional priority) throws Exception { + Scan scan = new Scan().setCaching(1).setMaxResultSize(1); + priority.ifPresent(scan::setPriority); + + try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) { assertNotNull(scanner.next()); - Thread.sleep(1000); + // wait for at least one renew to come in before closing + renewFuture.join(); } + // ensures the close thread has time to finish before asserting threadPool.shutdown(); threadPool.awaitTermination(5, TimeUnit.SECONDS); // just verify that the calls happened. verification of priority occurred in the mocking - // open, next, then several renew lease - verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + // open, next, then one or more lease renewals, then close + verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any()); + // additionally, explicitly check for a close request verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index 21c8be185846..f971a5384d7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -244,9 +244,6 @@ private void testReversedScan(int start, boolean startInclusive, int stop, boole int count = actualStart - actualStop; if (limit > 0) { count = Math.min(count, limit); - } - if (scan.getBatch() > 0) { - } assertEquals(count, results.size()); IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); From 2c50e2fb756f6f5cbd267b9ae3d99917b47644e4 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Fri, 4 Mar 2022 17:29:58 -0500 Subject: [PATCH 6/9] checkstyle --- .../hadoop/hbase/client/TestAsyncTableRpcPriority.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index 2d174673d071..568fbfea8e2f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -29,7 +29,11 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.Arrays; @@ -554,7 +558,8 @@ public void testScanMetaTable() throws Exception { testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty()); } - private void testForTable(TableName tableName, CompletableFuture renewFuture, Optional priority) throws Exception { + private void testForTable(TableName tableName, CompletableFuture renewFuture, + Optional priority) throws Exception { Scan scan = new Scan().setCaching(1).setMaxResultSize(1); priority.ifPresent(scan::setPriority); From 374d301e886ebb14735ee81c7157d8c52b50578d Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Fri, 4 Mar 2022 19:51:30 -0500 Subject: [PATCH 7/9] fix imports --- .../client/TestAsyncTableRpcPriority.java | 27 ++++++++++--------- .../hbase/client/TestTableRpcPriority.java | 6 ++--- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index 568fbfea8e2f..1e6a4b345e76 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -56,6 +56,19 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -70,19 +83,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; + /** * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java index 4fcf65e6016d..31af0373cb9c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java @@ -47,12 +47,9 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -62,6 +59,9 @@ import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; /** * Test that correct rpc priority is sent to server from blocking Table calls. Currently From 12a69e22e4abb7c90f0f40948def16fe84acf884 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Sat, 5 Mar 2022 08:31:27 -0500 Subject: [PATCH 8/9] single import --- .../apache/hadoop/hbase/client/TestTableRpcPriority.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java index 31af0373cb9c..7aef284ed7c6 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java @@ -26,7 +26,12 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.Arrays; From 141f732b3ed3c5125251bf49f91c4e8e13382da7 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Sat, 5 Mar 2022 09:39:15 -0500 Subject: [PATCH 9/9] imports again --- .../java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index 04aaba1deacb..c1797f3833c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -24,13 +24,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class })