Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import org.apache.hadoop.util.WeakReferenceMap;

import static java.util.Objects.requireNonNull;

/**
* A WeakReferenceMap for threads.
* @param <V> value type of the map
Expand All @@ -36,30 +38,55 @@ public WeakReferenceThreadMap(final Function<? super Long, ? extends V> factory,
super(factory, referenceLost);
}

/**
* Get the value for the current thread, creating if needed.
* @return an instance.
*/
public V getForCurrentThread() {
return get(currentThreadId());
}

/**
* Remove the reference for the current thread.
* @return any reference value which existed.
*/
public V removeForCurrentThread() {
return remove(currentThreadId());
}

/**
* Get the current thread ID.
* @return thread ID.
*/
public long currentThreadId() {
return Thread.currentThread().getId();
}

/**
* Set the new value for the current thread.
* @param newVal new reference to set for the active thread.
* @return the previously set value, possibly null
*/
public V setForCurrentThread(V newVal) {
requireNonNull(newVal);
long id = currentThreadId();

// if the same object is already in the map, just return it.
WeakReference<V> ref = lookup(id);
// Reference value could be set to null. Thus, ref.get() could return
// null. Should be handled accordingly while using the returned value.
if (ref != null && ref.get() == newVal) {
return ref.get();
WeakReference<V> existingWeakRef = lookup(id);

// The looked up reference could be one of
// 1. null: nothing there
// 2. valid but get() == null : reference lost by GC.
// 3. different from the new value
// 4. the same as the old value
if (resolve(existingWeakRef) == newVal) {
// case 4: do nothing, return the new value
return newVal;
} else {
// cases 1, 2, 3: update the map and return the old value
return put(id, newVal);
}

return put(id, newVal);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration;

import static java.util.Objects.requireNonNull;

/**
* An interface defined to capture thread-level IOStatistics by using per
* thread context.
Expand Down Expand Up @@ -67,7 +69,11 @@ public interface IOStatisticsContext extends IOStatisticsSource {
* @return instance of IOStatisticsContext for the context.
*/
static IOStatisticsContext getCurrentIOStatisticsContext() {
return IOStatisticsContextIntegration.getCurrentIOStatisticsContext();
// the null check is just a safety check to highlight exactly where a null value would
// be returned if HADOOP-18456 has resurfaced.
return requireNonNull(
IOStatisticsContextIntegration.getCurrentIOStatisticsContext(),
"Null IOStatisticsContext");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ private IOStatisticsContextIntegration() {}
* @return an instance of IOStatisticsContext.
*/
private static IOStatisticsContext createNewInstance(Long key) {
return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
IOStatisticsContextImpl instance =
new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
LOG.debug("Created instance {}", instance);
return instance;
}

/**
Expand Down Expand Up @@ -131,9 +134,11 @@ public static void setThreadIOStatisticsContext(
IOStatisticsContext statisticsContext) {
if (isThreadIOStatsEnabled) {
if (statisticsContext == null) {
// new value is null, so remove it
ACTIVE_IOSTATS_CONTEXT.removeForCurrentThread();
}
if (ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) {
} else {
// the setter is efficient in that it does not create a new
// reference if the context is unchanged.
ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static java.util.Objects.requireNonNull;

Expand All @@ -52,6 +56,9 @@
@InterfaceAudience.Private
public class WeakReferenceMap<K, V> {

private static final Logger LOG =
LoggerFactory.getLogger(WeakReferenceMap.class);

/**
* The reference map.
*/
Expand Down Expand Up @@ -79,6 +86,12 @@ public class WeakReferenceMap<K, V> {
*/
private final AtomicLong entriesCreatedCount = new AtomicLong();

/**
* Log to report loss of a reference during the create phase, which
* is believed to be a cause of HADOOP-18456.
*/
private final LogExactlyOnce referenceLostDuringCreation = new LogExactlyOnce(LOG);

/**
* instantiate.
* @param factory supplier of new instances
Expand Down Expand Up @@ -132,35 +145,93 @@ public WeakReference<V> lookup(K key) {
* @return an instance.
*/
public V get(K key) {
final WeakReference<V> current = lookup(key);
V val = resolve(current);
if (val != null) {
final WeakReference<V> currentWeakRef = lookup(key);
// resolve it, after which if not null, we have a strong reference
V strongVal = resolve(currentWeakRef);
if (strongVal != null) {
// all good.
return val;
return strongVal;
}

// here, either no ref, or the value is null
if (current != null) {
// here, either currentWeakRef was null, or its reference was GC'd.
if (currentWeakRef != null) {
// garbage collection removed the reference.

// explicitly remove the weak ref from the map if it has not
// been updated by this point
// this is here just for completeness.
map.remove(key, currentWeakRef);

// log/report the loss.
noteLost(key);
}

// create a new value and add it to the map
return create(key);
}

/**
* Create a new instance under a key.
* <p>
* The instance is created, added to the map and then the
* map value retrieved.
* This ensures that the reference returned is that in the map,
* even if there is more than one entry being created at the same time.
* If that race does occur, it will be logged the first time it happens
* for this specific map instance.
* <p>
* HADOOP-18456 highlighted the risk of a concurrent GC resulting a null
* value being retrieved and so returned.
* To prevent this:
* <ol>
* <li>A strong reference is retained to the newly created instance
* in a local variable.</li>
* <li>That variable is used after the resolution process, to ensure
* the JVM doesn't consider it "unreachable" and so eligible for GC.</li>
* <li>A check is made for the resolved reference being null, and if so,
* the put() is repeated</li>
* </ol>
* @param key key
* @return the value
* @return the created value
*/
public V create(K key) {
entriesCreatedCount.incrementAndGet();
WeakReference<V> newRef = new WeakReference<>(
requireNonNull(factory.apply(key)));
map.put(key, newRef);
return map.get(key).get();
/*
Get a strong ref so even if a GC happens in this method the reference is not lost.
It is NOT enough to have a reference in a field, it MUST be used
so as to ensure the reference isn't optimized away prematurely.
"A reachable object is any object that can be accessed in any potential continuing
computation from any live thread."
*/

final V strongRef = requireNonNull(factory.apply(key),
"factory returned a null instance");
V resolvedStrongRef;
do {
WeakReference<V> newWeakRef = new WeakReference<>(strongRef);

// put it in the map
map.put(key, newWeakRef);

// get it back from the map
WeakReference<V> retrievedWeakRef = map.get(key);
// resolve that reference, handling the situation where somehow it was removed from the map
// between the put() and the get()
resolvedStrongRef = resolve(retrievedWeakRef);
Comment on lines +218 to +220
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a strong reference on L207, why do we expect to lose it?

I could see it happening in the old create(K key) implementation, but less so here.

if (resolvedStrongRef == null) {
referenceLostDuringCreation.warn("reference to %s lost during creation", key);
noteLost(key);
}
} while (resolvedStrongRef == null);

// note if there was any change in the reference.
// as this forces strongRef to be kept in scope
if (strongRef != resolvedStrongRef) {
LOG.debug("Created instance for key {}: {} overwritten by {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is the case, shouldn't we raise an exception? Are we not returning the wrong value then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it's just addressing a race condition...the reason we do the lookup is to ensure that all threads share the same instance. the exact choice of which one is not considered relevant

key, strongRef, resolvedStrongRef);
}

return resolvedStrongRef;
}

/**
Expand Down Expand Up @@ -203,7 +274,7 @@ public boolean containsKey(K key) {
* @param r reference to resolve
* @return the value or null
*/
private V resolve(WeakReference<V> r) {
protected V resolve(WeakReference<V> r) {
return r == null ? null : r.get();
}

Expand Down Expand Up @@ -233,7 +304,7 @@ public int prune() {
* @param key key of lost reference
*/
private void noteLost(final K key) {
// incrment local counter
// increment local counter
referenceLostCount.incrementAndGet();

// and call any notification function supplied in the constructor
Expand Down
Loading