Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNLockManager;
import org.apache.hadoop.hdfs.server.namenode.fgl.GlobalFSNamesystemLock;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.http.HttpConfig;

Expand Down Expand Up @@ -1051,6 +1053,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY = "dfs.namenode.inode.attributes.provider.class";
public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_BYPASS_USERS_KEY = "dfs.namenode.inode.attributes.provider.bypass.users";
public static final String DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_BYPASS_USERS_DEFAULT = "";
public static final String DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY = "dfs.namenode.lock.model.provider.class";
public static final Class<? extends FSNLockManager> DFS_NAMENODE_LOCK_MODEL_PROVIDER_DEFAULT = GlobalFSNamesystemLock.class;

public static final String DFS_DATANODE_BP_READY_TIMEOUT_KEY = "dfs.datanode.bp-ready.timeout";
public static final long DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_MODEL_PROVIDER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_PERMISSIONS_SUPERUSER_ONLY_KEY;
Expand Down Expand Up @@ -96,6 +98,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry;

import java.lang.reflect.Constructor;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -114,6 +117,8 @@
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;

import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNLockManager;
import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotDeletionGc;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
Expand Down Expand Up @@ -621,7 +626,7 @@ private boolean isFromProxyUser(CallerContext ctx) {
private final int numCommittedAllowed;

/** Lock to protect FSNamesystem. */
private final FSNamesystemLock fsLock;
private final FSNLockManager fsLock;

/**
* Checkpoint lock to protect FSNamesystem modification on standby NNs.
Expand Down Expand Up @@ -871,7 +876,10 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
this.contextFieldSeparator =
conf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY,
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
Class<? extends FSNLockManager> lockKlass = conf.getClass(
DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY, DFS_NAMENODE_LOCK_MODEL_PROVIDER_DEFAULT,
FSNLockManager.class);
fsLock = createLock(lockKlass, conf, detailedLockHoldTimeMetrics);
cpLock = new ReentrantLock();

this.fsImage = fsImage;
Expand Down Expand Up @@ -1077,6 +1085,18 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
}
}

private <T> T createLock(Class<T> theClass, Configuration conf,
MutableRatesWithAggregation detailedLockHoldTimeMetrics) {
try {
Constructor<T> meth = theClass.getDeclaredConstructor(
Configuration.class, MutableRatesWithAggregation.class);
meth.setAccessible(true);
return meth.newInstance(conf, detailedLockHoldTimeMetrics);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static void checkForAsyncLogEnabledByOldConfigs(Configuration conf) {
// dfs.namenode.audit.log.async is no longer in use. Use log4j properties instead.
if (conf.getBoolean("dfs.namenode.audit.log.async", false)) {
Expand Down Expand Up @@ -1783,70 +1803,74 @@ public static List<URI> getSharedEditsDirs(Configuration conf) {
}

@Override
public void readLock() {
this.fsLock.readLock();
}

@Override
public void readLockInterruptibly() throws InterruptedException {
this.fsLock.readLockInterruptibly();
public void readLock(FSNamesystemLockMode lockMode) {
this.fsLock.readLock(lockMode);
}

@Override
public void readUnlock() {
this.fsLock.readUnlock();
public void readLockInterruptibly(FSNamesystemLockMode lockMode) throws InterruptedException {
this.fsLock.readLockInterruptibly(lockMode);
}

@Override
public void readUnlock(String opName) {
this.fsLock.readUnlock(opName);
public void readUnlock(FSNamesystemLockMode lockMode, String opName) {
this.fsLock.readUnlock(lockMode, opName);
}

public void readUnlock(String opName,
Supplier<String> lockReportInfoSupplier) {
this.fsLock.readUnlock(opName, lockReportInfoSupplier);
readUnlock(FSNamesystemLockMode.GLOBAL, opName, lockReportInfoSupplier);
}

@Override
public void writeLock() {
this.fsLock.writeLock();
public void readUnlock(FSNamesystemLockMode lockMode, String opName,
Supplier<String> lockReportInfoSupplier) {
this.fsLock.readUnlock(lockMode, opName, lockReportInfoSupplier);
}

@Override
public void writeLockInterruptibly() throws InterruptedException {
this.fsLock.writeLockInterruptibly();
public void writeLock(FSNamesystemLockMode lockMode) {
this.fsLock.writeLock(lockMode);
}

@Override
public void writeUnlock() {
this.fsLock.writeUnlock();
public void writeLockInterruptibly(FSNamesystemLockMode lockMode) throws InterruptedException {
this.fsLock.writeLockInterruptibly(lockMode);
}

@Override
public void writeUnlock(String opName) {
this.fsLock.writeUnlock(opName);
public void writeUnlock(FSNamesystemLockMode lockMode, String opName) {
this.fsLock.writeUnlock(lockMode, opName);
}

public void writeUnlock(String opName, boolean suppressWriteLockReport) {
this.fsLock.writeUnlock(opName, suppressWriteLockReport);
writeUnlock(FSNamesystemLockMode.GLOBAL, opName, suppressWriteLockReport);
}

public void writeUnlock(FSNamesystemLockMode lockMode, String opName,
boolean suppressWriteLockReport) {
this.fsLock.writeUnlock(lockMode, opName, suppressWriteLockReport);
}

public void writeUnlock(String opName, Supplier<String> lockReportInfoSupplier) {
writeUnlock(FSNamesystemLockMode.GLOBAL, opName, lockReportInfoSupplier);
}

public void writeUnlock(String opName,
public void writeUnlock(FSNamesystemLockMode lockMode, String opName,
Supplier<String> lockReportInfoSupplier) {
this.fsLock.writeUnlock(opName, lockReportInfoSupplier);
this.fsLock.writeUnlock(lockMode, opName, lockReportInfoSupplier);
}

@Override
public boolean hasWriteLock() {
return this.fsLock.isWriteLockedByCurrentThread();
public boolean hasWriteLock(FSNamesystemLockMode lockMode) {
return this.fsLock.hasWriteLock(lockMode);
}
@Override
public boolean hasReadLock() {
return this.fsLock.getReadHoldCount() > 0 || hasWriteLock();
public boolean hasReadLock(FSNamesystemLockMode lockMode) {
return this.fsLock.hasReadLock(lockMode);
}

public int getReadHoldCount() {
return this.fsLock.getReadHoldCount();
return this.fsLock.getReadHoldCount(FSNamesystemLockMode.GLOBAL);
}

/** Lock the checkpoint lock */
Expand Down Expand Up @@ -4927,21 +4951,21 @@ public float getReconstructionQueuesInitProgress() {
@Metric({"LockQueueLength", "Number of threads waiting to " +
"acquire FSNameSystemLock"})
public int getFsLockQueueLength() {
return fsLock.getQueueLength();
return fsLock.getQueueLength(FSNamesystemLockMode.FS);
}

@Metric(value = {"ReadLockLongHoldCount", "The number of time " +
"the read lock has been held for longer than the threshold"},
type = Metric.Type.COUNTER)
public long getNumOfReadLockLongHold() {
return fsLock.getNumOfReadLockLongHold();
return fsLock.getNumOfReadLockLongHold(FSNamesystemLockMode.FS);
}

@Metric(value = {"WriteLockLongHoldCount", "The number of time " +
"the write lock has been held for longer than the threshold"},
type = Metric.Type.COUNTER)
public long getNumOfWriteLockLongHold() {
return fsLock.getNumOfWriteLockLongHold();
return fsLock.getNumOfWriteLockLongHold(FSNamesystemLockMode.FS);
}

int getNumberOfDatanodes(DatanodeReportType type) {
Expand Down Expand Up @@ -7094,12 +7118,12 @@ public void setEditLogTailerForTests(EditLogTailer tailer) {

@VisibleForTesting
void setFsLockForTests(ReentrantReadWriteLock lock) {
this.fsLock.coarseLock = lock;
this.fsLock.setLockForTests(lock);
}

@VisibleForTesting
public ReentrantReadWriteLock getFsLockForTests() {
return fsLock.coarseLock;
return fsLock.getLockForTests();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
* {@link MutableRatesWithAggregation}. However since threads are re-used
* between operations this should not generally be an issue.
*/
class FSNamesystemLock {
public class FSNamesystemLock {
@VisibleForTesting
protected ReentrantReadWriteLock coarseLock;

Expand Down Expand Up @@ -129,7 +129,7 @@ public Long initialValue() {

private static final String OVERALL_METRIC_NAME = "Overall";

FSNamesystemLock(Configuration conf,
public FSNamesystemLock(Configuration conf,
MutableRatesWithAggregation detailedHoldTimeMetrics) {
this(conf, detailedHoldTimeMetrics, new Timer());
}
Expand Down Expand Up @@ -489,6 +489,14 @@ public long getWriteLockReportingThresholdMs() {
return writeLockReportingThresholdMs;
}

public void setLockForTests(ReentrantReadWriteLock lock) {
this.coarseLock = lock;
}

public ReentrantReadWriteLock getLockForTests() {
return this.coarseLock;
}

/**
* Read lock Held Info.
*/
Expand Down
Loading