Skip to content

Commit a90a30e

Browse files
authored
Merge pull request #112 from HubSpot/28396-hubspot-2.6
HubSpot Backport: HBASE-28396: Quota throttling can cause a leak of scanners (apache#6055)
2 parents 5791589 + f85f6f0 commit a90a30e

File tree

3 files changed

+335
-27
lines changed

3 files changed

+335
-27
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,18 @@ public void run() {
472472
}
473473
}
474474

475+
static class RegionScannerContext {
476+
final String scannerName;
477+
final RegionScannerHolder holder;
478+
final OperationQuota quota;
479+
480+
RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) {
481+
this.scannerName = scannerName;
482+
this.holder = holder;
483+
this.quota = quota;
484+
}
485+
}
486+
475487
/**
476488
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
477489
*/
@@ -1349,12 +1361,12 @@ public int getScannersCount() {
13491361

13501362
/** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
13511363
RegionScanner getScanner(long scannerId) {
1352-
RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1364+
RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
13531365
return rsh == null ? null : rsh.s;
13541366
}
13551367

13561368
/** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1357-
private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1369+
private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) {
13581370
return scanners.get(toScannerName(scannerId));
13591371
}
13601372

@@ -1395,7 +1407,7 @@ public String getScanDetailsWithRequest(ScanRequest request) {
13951407
* Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
13961408
*/
13971409
long getScannerVirtualTime(long scannerId) {
1398-
RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1410+
RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
13991411
return rsh == null ? 0L : rsh.getNextCallSeq();
14001412
}
14011413

@@ -3174,9 +3186,8 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
31743186
* @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
31753187
* value.
31763188
*/
3177-
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3189+
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region,
31783190
ScanResponse.Builder builder) throws IOException {
3179-
HRegion region = getRegion(request.getRegion());
31803191
ClientProtos.Scan protoScan = request.getScan();
31813192
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
31823193
Scan scan = ProtobufUtil.toScan(protoScan);
@@ -3590,22 +3601,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
35903601
}
35913602
requestCount.increment();
35923603
rpcScanRequestCount.increment();
3593-
RegionScannerHolder rsh;
3604+
RegionScannerContext rsx;
35943605
ScanResponse.Builder builder = ScanResponse.newBuilder();
3595-
String scannerName;
35963606
try {
3597-
if (request.hasScannerId()) {
3598-
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3599-
// for more details.
3600-
long scannerId = request.getScannerId();
3601-
builder.setScannerId(scannerId);
3602-
scannerName = toScannerName(scannerId);
3603-
rsh = getRegionScanner(request);
3604-
} else {
3605-
Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3606-
scannerName = scannerNameAndRSH.getFirst();
3607-
rsh = scannerNameAndRSH.getSecond();
3608-
}
3607+
rsx = checkQuotaAndGetRegionScannerContext(request, builder);
36093608
} catch (IOException e) {
36103609
if (e == SCANNER_ALREADY_CLOSED) {
36113610
// Now we will close scanner automatically if there are no more results for this region but
@@ -3614,6 +3613,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36143613
}
36153614
throw new ServiceException(e);
36163615
}
3616+
String scannerName = rsx.scannerName;
3617+
RegionScannerHolder rsh = rsx.holder;
3618+
OperationQuota quota = rsx.quota;
36173619
if (rsh.fullRegionScan) {
36183620
rpcFullScanRequestCount.increment();
36193621
}
@@ -3636,14 +3638,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36363638
}
36373639
return builder.build();
36383640
}
3639-
OperationQuota quota;
3640-
try {
3641-
quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize,
3642-
rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
3643-
} catch (IOException e) {
3644-
addScannerLeaseBack(lease);
3645-
throw new ServiceException(e);
3646-
}
36473641
try {
36483642
checkScanNextCallSeq(request, rsh);
36493643
} catch (OutOfOrderScannerNextException e) {
@@ -4173,4 +4167,26 @@ private void setReloadableGuardrails(Configuration conf) {
41734167
maxScannerResultSize = conf.getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
41744168
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
41754169
}
4170+
4171+
RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request,
4172+
ScanResponse.Builder builder) throws IOException {
4173+
if (request.hasScannerId()) {
4174+
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
4175+
// for more details.
4176+
long scannerId = request.getScannerId();
4177+
builder.setScannerId(scannerId);
4178+
String scannerName = toScannerName(scannerId);
4179+
RegionScannerHolder rsh = getRegionScanner(request);
4180+
OperationQuota quota =
4181+
getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize,
4182+
rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
4183+
return new RegionScannerContext(scannerName, rsh, quota);
4184+
}
4185+
4186+
HRegion region = getRegion(request.getRegion());
4187+
OperationQuota quota =
4188+
getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L);
4189+
Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder);
4190+
return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota);
4191+
}
41764192
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.quotas;
19+
20+
import java.util.List;
21+
import org.apache.hadoop.hbase.client.Mutation;
22+
import org.apache.hadoop.hbase.client.Result;
23+
24+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
25+
26+
public class TestNoopOperationQuota implements OperationQuota {
27+
public static final TestNoopOperationQuota INSTANCE = new TestNoopOperationQuota();
28+
29+
@Override
30+
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
31+
}
32+
33+
@Override
34+
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
35+
long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
36+
37+
}
38+
39+
@Override
40+
public void close() {
41+
42+
}
43+
44+
@Override
45+
public void addGetResult(Result result) {
46+
47+
}
48+
49+
@Override
50+
public void addScanResult(List<Result> results) {
51+
52+
}
53+
54+
@Override
55+
public void addMutation(Mutation mutation) {
56+
57+
}
58+
59+
@Override
60+
public long getReadAvailable() {
61+
return 0L;
62+
}
63+
64+
@Override
65+
public long getReadConsumed() {
66+
return 0;
67+
}
68+
}

0 commit comments

Comments
 (0)