Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,16 @@ public interface GSet<K, E extends K> extends Iterable<E> {
* @return the collection of values.
*/
Collection<E> values();

/**
* Create a new concurrency controller for this GSet.
* By default, a {@link SynchronizedGSetController} is returned.
* Subclasses may override this method to return a different type of controller.
*
* @return a new concurrency controller.
*/
default GSetConcurrencyController<K> newConcurrencyController() {
return SynchronizedGSetController.of();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;

/**
* A concurrency controller for {@link GSet}.
* This interface provides thread-safety mechanisms for GSet operations,
* including lock management and size correction for concurrent modifications.
*
* @param <K> The type of the key.
*/
public interface GSetConcurrencyController<K> {

/**
* Get the lock object for the given key.
*
* @param key the key.
* @return a lock object to synchronize on, or null if no synchronization is needed.
*/
Object getLock(K key);

/**
* Some implementations may allow modifying GSet concurrently, but leave the GSet size inaccurate.
* GSetConcurrencyController provides an independent size counter(initial value is the size of
* the GSet when constructed), which needs caller to keep track of the size change and call
* this method to correct the size.
* The size change will not be applied to the underlying GSet util {@link #correctSize()}
* is called.
*
* @param deltaSize deltaSize, a negative value means size decrease.
*/
void addSize(int deltaSize);

/**
* Apply the size correction to the underlying GSet.
*/
void correctSize();

/**
* In some scenarios, we already know the final size of the GSet. We can use this method
* to correct the size directly without calling {@link #addSize(int)} repeatedly.
*
* <p>NOTE: Caller is responsible for ensuring the correctness of the given size.</p>
*
* @param size the corrected size.
*/
void correctSize(int size);

/**
* A convenience method to execute a runnable under the lock of the given key.
*
* @param key the key.
* @param runnable the runnable to be executed.
*/
default void doUnderLock(K key, Runnable runnable) {
Object lock = getLock(key);
if (lock != null) {
synchronized (lock) {
runnable.run();
}
} else {
runnable.run();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,10 @@ public void remove() {
}
};
}

@Override
public GSetConcurrencyController<K> newConcurrencyController() {
return SynchronizedGSetController.of();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -421,4 +421,10 @@ public void clear() {
Arrays.fill(entries, null);
size = 0;
}

@Override
public GSetConcurrencyController<K> newConcurrencyController() {
return new LightWeightGSetConcurrencyController<>(this);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;

import java.util.concurrent.atomic.LongAdder;

/**
* A concurrency controller implementation for {@link LightWeightGSet}.
*
* <p>This class provides fine-grained locking mechanism for concurrent access to
* {@link LightWeightGSet} by using a fixed array of locks. Each key is mapped to
* a specific lock based on its hash value, allowing multiple threads to operate
* on different parts of the set simultaneously while maintaining thread safety.
*
* <p>The controller uses a lock striping approach with a fixed number of locks
* (4096 by default) to balance between memory usage and concurrency level. Keys are
* distributed across these locks using modulo operation on the key's index.
*
* <p><strong>Thread Safety Scope:</strong>
* <ul>
* <li><strong>Safe:</strong> Single-key operations like {@code get()}, {@code put()},
* {@code remove()}, {@code contains()} when properly synchronized using the lock
* returned by {@link #getLock(Object)}</li>
* <li><strong>NOT Safe:</strong> Bulk operations like {@code values()}, {@code iterator()},
* or any operations that traverse the entire set. These operations
* require additional synchronization mechanisms beyond this controller.
* The {@code size()} will also be inconsistent if multiple threads are modifying the
* set concurrently, call {@code correctSize(int)} to correct it. </li>
* </ul>
*
* @param <K> Key type for looking up the elements
* @param <E> Element type, which must be
* (1) a subclass of K, and
* (2) implementing {@link LightWeightGSet.LinkedElement} interface.
*
* @see LightWeightGSet
* @see GSetConcurrencyController
*/
public class LightWeightGSetConcurrencyController<K, E extends K>
implements GSetConcurrencyController<K> {

private static final int CONCURRENCY = 16 * 16 * 16;

/**
* Array of lock objects used for synchronization.
* Each lock protects a subset of keys based on their hash values.
*/
private final Object[] locks;

/**
* Reference to the underlying LightWeightGSet that this controller manages.
*/
private final LightWeightGSet<K, E> lightWeightGSet;

private final LongAdder size = new LongAdder();

/**
* Constructs a new concurrency controller for the given LightWeightGSet.
*
* @param lightWeightGSet the LightWeightGSet instance to control concurrent access for
*/
public LightWeightGSetConcurrencyController(LightWeightGSet<K, E> lightWeightGSet) {
this.locks = new Object[CONCURRENCY];
this.lightWeightGSet = lightWeightGSet;
this.size.add(lightWeightGSet.size());
initLocks();
}

private void initLocks() {
for (int i = 0; i < this.locks.length; i++) {
locks[i] = new Object();
}
}

/**
* Corrects the size of the underlying LightWeightGSet.
*
* <p>This method is used when concurrent modifications may have left
* the size of the LightWeightGSet in an inconsistent state. It directly updates the
* size field of the underlying set to the correct value.
*
* @param correctSize the correct size to set
*/
@Override
public void correctSize(int correctSize) {
lightWeightGSet.size = correctSize;
}

@Override
public void addSize(int deltaSize) {
size.add(deltaSize);
}

@Override
public void correctSize() {
correctSize(size.intValue());
}

/**
* Returns the lock object associated with the given key.
*
* <p>The lock is determined by:
* <ol>
* <li>Computing the key's index using the underlying set's getIndex method</li>
* <li>Mapping the index to a lock using modulo operation</li>
* </ol>
*
* <p>This ensures that the same key always maps to the same lock, while
* distributing different keys across multiple locks for better concurrency.
*
* @param key the key for which to get the associated lock
* @return the lock object that should be used for synchronizing operations on this key
* @throws NullPointerException if key is null (depending on the underlying set's behavior)
*/
public Object getLock(K key) {
int index = lightWeightGSet.getIndex(key);
return locks[index % locks.length];
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,9 @@ protected synchronized void expandIfNecessary() {
resize(capacity * 2);
}
}

@Override
public GSetConcurrencyController<K> newConcurrencyController() {
return SynchronizedGSetController.of();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;

/**
* A concurrency controller that does not use any lock.
*/
public enum LockFreeGSetController implements GSetConcurrencyController<Object> {
INSTANCE;

@Override
public Object getLock(Object key) {
return null;
}

@Override
public void correctSize(int size) {
// do nothing
}

@Override
public void addSize(int deltaSize) {
// do nothing
}

@Override
public void correctSize() {
// do nothing
}

@SuppressWarnings("unchecked")
public static <K> GSetConcurrencyController<K> getInstance() {
return (GSetConcurrencyController<K>) INSTANCE;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;

/**
* A concurrency controller that uses a single lock for all operations.
*
* @param <K> The type of the key.
*/
public class SynchronizedGSetController<K> implements GSetConcurrencyController<K> {

private final Object lock = new Object();

/**
* Create a new SynchronizedGSetController.
*/
private SynchronizedGSetController() {
}

@Override
public Object getLock(K key) {
return lock;
}

@Override
public void correctSize(int size) {
// do nothing
}

@Override
public void addSize(int deltaSize) {
// do nothing
}

@Override
public void correctSize() {
// do nothing
}

public static <K> SynchronizedGSetController<K> of() {
return new SynchronizedGSetController<>();
}

}
Loading