Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.resource;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should use the org.apache.hadoop.hdds.utils package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duongkame , any comments on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.ref.ReferenceQueue;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
* observe resource object life-cycle and assert proper resource closure before they are GCed.
*
* <p>
* Example usage:
*
* <pre> {@code
* class MyResource implements AutoClosable {
* static final LeakDetector LEAK_DETECTOR = new LeakDetector("MyResource");
*
* private final LeakTracker leakTracker = LEAK_DETECTOR.track(this, () -> {
* // report leaks, don't refer to the original object (MyResource) here.
* System.out.println("MyResource is not closed before being discarded.");
* });
*
* @Override
* public void close() {
* // proper resources cleanup...
* // inform tracker that this object is closed properly.
* leakTracker.close();
* }
* }
*
* }</pre>
*/
public class LeakDetector {
public static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);
private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final String name;

public LeakDetector(String name) {
this.name = name;
start();
}

private void start() {
Thread t = new Thread(this::run);
t.setName(LeakDetector.class.getSimpleName() + "-" + name);
t.setDaemon(true);
LOG.info("Starting leak detector thread {}.", name);
t.start();
}

private void run() {
while (true) {
try {
LeakTracker tracker = (LeakTracker) queue.remove();
// Original resource already been GCed, if tracker is not closed yet,
// report a leak.
if (allLeaks.remove(tracker)) {
tracker.reportLeak();
}
} catch (InterruptedException e) {
LOG.warn("Thread interrupted, exiting.", e);
break;
}
}

LOG.warn("Exiting leak detector {}.", name);
}

public LeakTracker track(Object leakable, Runnable reportLeak) {
// A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
// if we have proofs that leak tracking impacts performance, or a single LeakDetector
// thread can't keep up with the pace of object allocation.
// For now, it looks effective enough and let keep it simple.
LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak);
allLeaks.add(tracker);
return tracker;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.resource;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Set;

/**
* A token to track resource closure.
*
* @see LeakDetector
*/
public class LeakTracker extends WeakReference<Object> {
private final Set<LeakTracker> allLeaks;
private final Runnable leakReporter;
LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
Set<LeakTracker> allLeaks, Runnable leakReporter) {
super(referent, referenceQueue);
this.allLeaks = allLeaks;
this.leakReporter = leakReporter;
}

/**
* Called by the tracked resource when closing.
*/
public void close() {
allLeaks.remove(this);
}

void reportLeak() {
leakReporter.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Contains utilities for resource management.
*/
package org.apache.hadoop.hdds.resource;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be removed.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.resource;

import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Test LeakDetector.
*/
public class TestLeakDetector {
private static final LeakDetector LEAK_DETECTOR = new LeakDetector("test");
private AtomicInteger leaks = new AtomicInteger(0);

@Test
public void testLeakDetector() throws Exception {
// create and close resource => no leaks.
createResource(true);
System.gc();
Thread.sleep(100);
assertEquals(0, leaks.get());

// create and not close => leaks.
createResource(false);
System.gc();
Thread.sleep(100);
assertEquals(1, leaks.get());
}

private void createResource(boolean close) throws Exception {
MyResource resource = new MyResource(leaks);
if (close) {
resource.close();
}
}

private static final class MyResource implements AutoCloseable {
private final LeakTracker leakTracker;

private MyResource(final AtomicInteger leaks) {
leakTracker = LEAK_DETECTOR.track(this, () -> leaks.incrementAndGet());
}

@Override
public void close() throws Exception {
leakTracker.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.NoSuchElementException;
import java.util.function.Consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -307,28 +306,4 @@ public void testNormalPrefixedIterator() throws IOException {

iter.close();
}

@Test
public void testGetStackTrace() {
ManagedRocksIterator iterator = mock(ManagedRocksIterator.class);
RocksIterator mock = mock(RocksIterator.class);
when(iterator.get()).thenReturn(mock);
when(mock.isOwningHandle()).thenReturn(true);
ManagedRocksObjectUtils.assertClosed(iterator);
verify(iterator, times(1)).getStackTrace();

iterator = new ManagedRocksIterator(rocksDBIteratorMock);

// construct the expected trace.
StackTraceElement[] traceElements = Thread.currentThread().getStackTrace();
StringBuilder sb = new StringBuilder();
// first 2 lines will differ.
for (int i = 2; i < traceElements.length; i++) {
sb.append(traceElements[i]);
sb.append("\n");
}
String expectedTrace = sb.toString();
String fromObjectInit = iterator.getStackTrace();
assertThat(fromObjectInit).contains(expectedTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@
*/
package org.apache.hadoop.hdds.utils.db.managed;

import org.apache.hadoop.hdds.resource.LeakTracker;
import org.rocksdb.BloomFilter;

import javax.annotation.Nullable;

import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.assertClosed;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.formatStackTrace;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.getStackTrace;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;

/**
* Managed BloomFilter.
*/
public class ManagedBloomFilter extends BloomFilter {

@Nullable
private final StackTraceElement[] elements = getStackTrace();
private final LeakTracker leakTracker = track(this);

@Override
protected void finalize() throws Throwable {
assertClosed(this, formatStackTrace(elements));
super.finalize();
public void close() {
super.close();
leakTracker.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,24 @@
*/
package org.apache.hadoop.hdds.utils.db.managed;

import org.apache.hadoop.hdds.resource.LeakTracker;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.TableFormatConfig;

import javax.annotation.Nullable;

import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.assertClosed;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.formatStackTrace;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.getStackTrace;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;

/**
* Managed ColumnFamilyOptions.
*/
public class ManagedColumnFamilyOptions extends ColumnFamilyOptions {

@Nullable
private final StackTraceElement[] elements = getStackTrace();

/**
* Indicate if this ColumnFamilyOptions is intentionally used across RockDB
* instances.
*/
private boolean reused = false;
private final LeakTracker leakTracker = track(this);

public ManagedColumnFamilyOptions() {
super();
}

public ManagedColumnFamilyOptions(ColumnFamilyOptions columnFamilyOptions) {
Expand Down Expand Up @@ -85,9 +78,9 @@ public boolean isReused() {
}

@Override
protected void finalize() throws Throwable {
assertClosed(this, formatStackTrace(elements));
super.finalize();
public void close() {
super.close();
leakTracker.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@
*/
package org.apache.hadoop.hdds.utils.db.managed;

import org.apache.hadoop.hdds.resource.LeakTracker;
import org.rocksdb.CompactRangeOptions;

import javax.annotation.Nullable;

import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.assertClosed;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.formatStackTrace;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.getStackTrace;
import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;

/**
* Managed CompactRangeOptions.
*/
public class ManagedCompactRangeOptions extends CompactRangeOptions {

@Nullable
private final StackTraceElement[] elements = getStackTrace();
private final LeakTracker leakTracker = track(this);

@Override
protected void finalize() throws Throwable {
assertClosed(this, formatStackTrace(elements));
super.finalize();
public void close() {
super.close();
leakTracker.close();
}
}
Loading