Skip to content

Commit 57a3781

Browse files
committed
HBASE-27170 ByteBuffAllocator leak when decompressing blocks near minSizeForReservoirUse (#4592)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent 20f2479 commit 57a3781

7 files changed

Lines changed: 664 additions & 29 deletions

File tree

hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,20 @@ public ByteBuff allocate(int size) {
323323
// just allocate the ByteBuffer from on-heap.
324324
bbs.add(allocateOnHeap(remain));
325325
}
326-
ByteBuff bb = ByteBuff.wrap(bbs, () -> {
327-
for (int i = 0; i < lenFromReservoir; i++) {
328-
this.putbackBuffer(bbs.get(i));
329-
}
330-
});
326+
327+
ByteBuff bb;
328+
// we only need a recycler if we successfully pulled from the pool
329+
// this matters for determining whether to add leak detection in RefCnt
330+
if (lenFromReservoir == 0) {
331+
bb = ByteBuff.wrap(bbs);
332+
} else {
333+
bb = ByteBuff.wrap(bbs, () -> {
334+
for (int i = 0; i < lenFromReservoir; i++) {
335+
this.putbackBuffer(bbs.get(i));
336+
}
337+
});
338+
}
339+
331340
bb.limit(size);
332341
return bb;
333342
}

hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.nio;
1919

20+
import com.google.errorprone.annotations.RestrictedApi;
2021
import java.io.IOException;
2122
import java.nio.ByteBuffer;
2223
import java.nio.channels.FileChannel;
@@ -547,6 +548,28 @@ public static ByteBuff wrap(ByteBuffer buffer) {
547548
return wrap(buffer, RefCnt.create());
548549
}
549550

551+
/**
552+
* Calling this method in strategic locations where ByteBuffs are referenced may help diagnose
553+
* potential buffer leaks. We pass the buffer itself as a default hint, but one can use
554+
* {@link #touch(Object)} to pass their own hint as well.
555+
*/
556+
@Override
557+
public ByteBuff touch() {
558+
return touch(this);
559+
}
560+
561+
@Override
562+
public ByteBuff touch(Object hint) {
563+
refCnt.touch(hint);
564+
return this;
565+
}
566+
567+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
568+
allowedOnPath = ".*/src/test/.*")
569+
public RefCnt getRefCnt() {
570+
return refCnt;
571+
}
572+
550573
/**
551574
* Make this private because we don't want to expose the refCnt related wrap method to upstream.
552575
*/

hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
*/
1818
package org.apache.hadoop.hbase.nio;
1919

20+
import com.google.errorprone.annotations.RestrictedApi;
2021
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
2122
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
2223
import org.apache.yetus.audience.InterfaceAudience;
2324

2425
import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
2526
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
27+
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
28+
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetectorFactory;
29+
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakTracker;
2630

2731
/**
2832
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
@@ -31,7 +35,10 @@
3135
@InterfaceAudience.Private
3236
public class RefCnt extends AbstractReferenceCounted {
3337

34-
private Recycler recycler = ByteBuffAllocator.NONE;
38+
private static final ResourceLeakDetector<RefCnt> detector =
39+
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(RefCnt.class);
40+
private final Recycler recycler;
41+
private final ResourceLeakTracker<RefCnt> leak;
3542

3643
/**
3744
* Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
@@ -49,15 +56,66 @@ public static RefCnt create(Recycler recycler) {
4956

5057
public RefCnt(Recycler recycler) {
5158
this.recycler = recycler;
59+
this.leak = recycler == ByteBuffAllocator.NONE ? null : detector.track(this);
60+
}
61+
62+
@Override
63+
public ReferenceCounted retain() {
64+
maybeRecord();
65+
return super.retain();
66+
}
67+
68+
@Override
69+
public ReferenceCounted retain(int increment) {
70+
maybeRecord();
71+
return super.retain(increment);
72+
}
73+
74+
@Override
75+
public boolean release() {
76+
maybeRecord();
77+
return super.release();
78+
}
79+
80+
@Override
81+
public boolean release(int decrement) {
82+
maybeRecord();
83+
return super.release(decrement);
5284
}
5385

5486
@Override
5587
protected final void deallocate() {
5688
this.recycler.free();
89+
if (leak != null) {
90+
this.leak.close(this);
91+
}
92+
}
93+
94+
@Override
95+
public RefCnt touch() {
96+
maybeRecord();
97+
return this;
5798
}
5899

59100
@Override
60101
public final ReferenceCounted touch(Object hint) {
61-
throw new UnsupportedOperationException();
102+
maybeRecord(hint);
103+
return this;
104+
}
105+
106+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
107+
allowedOnPath = ".*/src/test/.*")
108+
public Recycler getRecycler() {
109+
return recycler;
110+
}
111+
112+
private void maybeRecord() {
113+
maybeRecord(null);
114+
}
115+
116+
private void maybeRecord(Object hint) {
117+
if (leak != null) {
118+
leak.record(hint);
119+
}
62120
}
63121
}

hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.getHeapAllocationRatio;
2222
import static org.junit.Assert.assertEquals;
2323
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertSame;
2425
import static org.junit.Assert.assertTrue;
2526
import static org.junit.Assert.fail;
2627

@@ -45,6 +46,21 @@ public class TestByteBuffAllocator {
4546
public static final HBaseClassTestRule CLASS_RULE =
4647
HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
4748

49+
@Test
50+
public void testRecycleOnlyPooledBuffers() {
51+
int maxBuffersInPool = 10;
52+
int bufSize = 1024;
53+
int minSize = bufSize / 8;
54+
ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
55+
56+
ByteBuff buff = alloc.allocate(minSize - 1);
57+
assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
58+
59+
alloc = new ByteBuffAllocator(true, 0, bufSize, minSize);
60+
buff = alloc.allocate(minSize * 2);
61+
assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
62+
}
63+
4864
@Test
4965
public void testAllocateByteBuffToReadInto() {
5066
int maxBuffersInPool = 10;
@@ -329,8 +345,6 @@ public void testByteBuffUnsupportedMethods() {
329345
ByteBuff buf = alloc.allocate(bufSize);
330346
assertException(() -> buf.retain(2));
331347
assertException(() -> buf.release(2));
332-
assertException(() -> buf.touch());
333-
assertException(() -> buf.touch(new Object()));
334348
}
335349

336350
private void assertException(Runnable r) {

0 commit comments

Comments
 (0)