Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,19 @@ private void close() {
ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
HBaseRpcController controller = rpcControllerFactory.newController();
// pull in the original priority, but then try to set to HIGH.
// it will take whatever is highest.
controller.setPriority(controller.getPriority());
controller.setPriority(HConstants.HIGH_QOS);
if (controller.hasCallTimeout()) {
controller.setCallTimeout(controller.getCallTimeout());

// Set fields from the original controller onto the close-specific controller
// We set the timeout and the priority -- we will overwrite the priority to HIGH
// below, but the controller will take whichever is highest.
if (getRpcController() instanceof HBaseRpcController) {
HBaseRpcController original = (HBaseRpcController) getRpcController();
controller.setPriority(original.getPriority());
if (original.hasCallTimeout()) {
controller.setCallTimeout(original.getCallTimeout());
}
}
controller.setPriority(HConstants.HIGH_QOS);

try {
getStub().scan(controller, request);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,11 @@ public synchronized void notifyOnCancel(RpcCallback<Object> callback, Cancellati
action.run(false);
}
}

@Override public String toString() {
return "HBaseRpcControllerImpl{" + "callTimeout=" + callTimeout + ", done=" + done
+ ", cancelled=" + cancelled + ", cancellationCbs=" + cancellationCbs + ", exception="
+ exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner="
+ cellScanner + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
Expand Down Expand Up @@ -74,6 +72,7 @@
*/
@Category({ ClientTests.class, MediumTests.class })
public class TestTableRpcPriority {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTableRpcPriority.class);
Expand All @@ -89,6 +88,7 @@ public void setUp() throws IOException, ServiceException {
stub = mock(ClientProtos.ClientService.BlockingInterface.class);

Configuration conf = HBaseConfiguration.create();

ExecutorService executorService = Executors.newCachedThreadPool();
conn = new ConnectionImplementation(conf, executorService,
UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) {
Expand Down Expand Up @@ -122,6 +122,16 @@ public void testScan() throws Exception {
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19));
}

/**
* This test verifies that our closeScanner request honors the original
* priority of the scan if it's greater than our expected HIGH_QOS for close calls.
*/
@Test
public void testScanSuperHighPriority() throws Exception {
mockScan(1000);
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000));
}

@Test
public void testScanNormalTable() throws Exception {
mockScan(NORMAL_QOS);
Expand Down Expand Up @@ -153,11 +163,22 @@ private void testForTable(TableName tableName, Optional<Integer> priority) throw
// just verify that the calls happened. verification of priority occurred in the mocking
// open, next, then several renew lease
verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class));
verify(stub, times(1)).scan(any(), assertScannerCloseRequest());
verify(stub, times(1)).scan(
assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)), assertScannerCloseRequest());
}

private void mockScan(int scanPriority) throws ServiceException {
int scannerId = 1;

doAnswer(new Answer<ClientProtos.ScanResponse>() {
@Override public ClientProtos.ScanResponse answer(InvocationOnMock invocation)
throws Throwable {
throw new IllegalArgumentException(
"Call not covered by explicit mock for arguments controller="
+ invocation.getArgument(0) + ", request=" + invocation.getArgument(1));
}
}).when(stub).scan(any(), any());

AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<ClientProtos.ScanResponse>() {

Expand All @@ -182,7 +203,7 @@ public ClientProtos.ScanResponse answer(InvocationOnMock invocation)
return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true)
.addResults(ProtobufUtil.toResult(result)).build();
}
}).when(stub).scan(assertPriority(scanPriority), any(ClientProtos.ScanRequest.class));
}).when(stub).scan(assertControllerArgs(scanPriority), any());

doAnswer(new Answer<ClientProtos.ScanResponse>() {

Expand All @@ -197,15 +218,19 @@ public ClientProtos.ScanResponse answer(InvocationOnMock invocation)

return ClientProtos.ScanResponse.getDefaultInstance();
}
}).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest());
}).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)),
assertScannerCloseRequest());
}

private HBaseRpcController assertPriority(int priority) {
private HBaseRpcController assertControllerArgs(int priority) {
return argThat(new ArgumentMatcher<HBaseRpcController>() {

@Override
public boolean matches(HBaseRpcController controller) {
return controller.getPriority() == priority;
// check specified priority, but also check that it has a timeout
// this ensures that our conversion from the base controller to the close-specific
// controller honored the original arguments.
return controller.getPriority() == priority && controller.hasCallTimeout();
}
});
}
Expand Down