Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = 300;

public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY =
"dfs.namenode.excess.redundancy.timeout-sec";
public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC = 3600;
public static final String DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT
= "dfs.namenode.excess.redundancy.timeout.check.limit";
public static final long DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT = 1000;

public static final String DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY =
"dfs.namenode.maintenance.replication.min";
public static final int DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
Expand Down Expand Up @@ -116,6 +117,7 @@

import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;

import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -482,6 +484,16 @@ public int getPendingSPSPaths() {
/** Storages accessible from multiple DNs. */
private final ProvidedStorageMap providedStorageMap;

/**
* Timeout for excess redundancy block.
*/
private long excessRedundancyTimeout;

/**
* Limits number of blocks used to check for excess redundancy timeout.
*/
private long excessRedundancyTimeoutCheckLimit;

public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
Expand Down Expand Up @@ -589,6 +601,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);

setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY,
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC));
setExcessRedundancyTimeoutCheckLimit(conf.getLong(
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT,
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT));

printInitialConfigs();
}

Expand Down Expand Up @@ -3040,6 +3058,98 @@ void rescanPostponedMisreplicatedBlocks() {
(Time.monotonicNow() - startTime), endSize, (startSize - endSize));
}
}

/**
* Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is
* less than or equal to 0, the default value is used (converted to milliseconds).
* @param timeOut The time (in seconds) to set as the excess redundancy block timeout.
*/
public void setExcessRedundancyTimeout(long timeOut) {
if (timeOut <= 0) {
this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L;
} else {
this.excessRedundancyTimeout = timeOut * 1000L;
}
}

/**
* Sets the limit number of blocks for checking excess redundancy timeout.
* If the provided limit is less than or equal to 0, the default limit is used.
*
* @param limit The limit number of blocks used to check for excess redundancy timeout.
*/
public void setExcessRedundancyTimeoutCheckLimit(long limit) {
if (excessRedundancyTimeoutCheckLimit <= 0) {
this.excessRedundancyTimeoutCheckLimit =
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT;
} else {
this.excessRedundancyTimeoutCheckLimit = limit;
}
}

/**
* Process timed-out blocks in the excess redundancy map.
*/
void processTimedOutExcessBlocks() {
if (excessRedundancyMap.size() == 0) {
return;
}
namesystem.writeLock();
long now = Time.monotonicNow();
int processed = 0;
try {
Iterator<Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>>> iter =
excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator();
while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) {
Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>> entry = iter.next();
String datanodeUuid = entry.getKey();
LightWeightHashSet<ExcessBlockInfo> blocks = entry.getValue();
List<ExcessRedundancyMap.ExcessBlockInfo> sortedBlocks = new ArrayList<>(blocks);
// Sort blocks by timestamp in descending order.
Collections.sort(sortedBlocks);

for (ExcessBlockInfo excessBlockInfo : sortedBlocks) {
if (processed >= excessRedundancyTimeoutCheckLimit) {
break;
}
BlockInfo blockInfo = excessBlockInfo.getBlockInfo();
BlockInfo bi = blocksMap.getStoredBlock(blockInfo);
if (bi == null || bi.isDeleted()) {
continue;
}

// If the datanode doesn't have any excess block that has exceeded the timeout,
// can exit this loop.
if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) {
break;
}

Iterator<DatanodeStorageInfo> iterator = blockInfo.getStorageInfos();
while (iterator.hasNext()) {
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor();
if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid)) {
if (datanodeStorageInfo.getState().equals(State.NORMAL)) {
final Block block = getBlockOnStorage(blockInfo,
datanodeStorageInfo);
if (!containsInvalidateBlock(datanodeDescriptor, block)) {
addToInvalidates(block, datanodeDescriptor);
LOG.debug("Excess block timeout ({}, {}) is added to invalidated.",
block, datanodeDescriptor);
}
excessBlockInfo.setTimeStamp();
processed++;
break;
}
}
}
}
}
} finally {
namesystem.writeUnlock("processTimedOutExcessBlocks");
LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now));
}
}

Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
Expand Down Expand Up @@ -5231,6 +5341,7 @@ public void run() {
computeDatanodeWork();
processPendingReconstructions();
rescanPostponedMisreplicatedBlocks();
processTimedOutExcessBlocks();
lastRedundancyCycleTS.set(Time.monotonicNow());
}
TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.apache.hadoop.classification.VisibleForTesting;

import static org.apache.hadoop.util.Time.monotonicNow;

/**
* Maps a datnode to the set of excess redundancy details.
*
Expand All @@ -35,7 +37,7 @@
class ExcessRedundancyMap {
public static final Logger blockLog = NameNode.blockStateChangeLog;

private final Map<String, LightWeightHashSet<BlockInfo>> map =new HashMap<>();
private final Map<String, LightWeightHashSet<ExcessBlockInfo>> map = new HashMap<>();
private final AtomicLong size = new AtomicLong(0L);

/**
Expand All @@ -50,7 +52,7 @@ long size() {
*/
@VisibleForTesting
synchronized int getSize4Testing(String dnUuid) {
final LightWeightHashSet<BlockInfo> set = map.get(dnUuid);
final LightWeightHashSet<ExcessBlockInfo> set = map.get(dnUuid);
return set == null? 0: set.size();
}

Expand All @@ -64,8 +66,8 @@ synchronized void clear() {
* datanode and the given block?
*/
synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
return set != null && set.contains(blk);
final LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid());
return set != null && set.contains(new ExcessBlockInfo(blk));
}

/**
Expand All @@ -75,12 +77,12 @@ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
* @return true if the block is added.
*/
synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid());
if (set == null) {
set = new LightWeightHashSet<>();
map.put(dn.getDatanodeUuid(), set);
}
final boolean added = set.add(blk);
final boolean added = set.add(new ExcessBlockInfo(blk));
if (added) {
size.incrementAndGet();
blockLog.debug("BLOCK* ExcessRedundancyMap.add({}, {})", dn, blk);
Expand All @@ -95,12 +97,12 @@ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
* @return true if the block is removed.
*/
synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
final LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid());
if (set == null) {
return false;
}

final boolean removed = set.remove(blk);
final boolean removed = set.remove(new ExcessBlockInfo(blk));
if (removed) {
size.decrementAndGet();
blockLog.debug("BLOCK* ExcessRedundancyMap.remove({}, {})", dn, blk);
Expand All @@ -111,4 +113,56 @@ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
}
return removed;
}

synchronized Map<String, LightWeightHashSet<ExcessBlockInfo>> getExcessRedundancyMap() {
return map;
}

/**
* An object that contains information about a block that is being excess redundancy.
* It records the timestamp when added excess redundancy map of this block.
*/
static class ExcessBlockInfo implements Comparable<ExcessBlockInfo> {
private long timeStamp;
private BlockInfo blockInfo;

ExcessBlockInfo(BlockInfo blockInfo) {
this.timeStamp = monotonicNow();
this.blockInfo = blockInfo;
}

public BlockInfo getBlockInfo() {
return blockInfo;
}

long getTimeStamp() {
return timeStamp;
}

void setTimeStamp() {
timeStamp = monotonicNow();
}

@Override
public int hashCode() {
return blockInfo.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof ExcessBlockInfo)) {
return false;
}
ExcessBlockInfo other = (ExcessBlockInfo) obj;
return (this.blockInfo.equals(other.blockInfo));
}

@Override
public int compareTo(ExcessBlockInfo o) {
return Long.compare(this.timeStamp, o.timeStamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5409,6 +5409,24 @@
</description>
</property>

<property>
<name>dfs.namenode.excess.redundancy.timeout-sec</name>
<value>3600</value>
<description>
Timeout in seconds for excess redundancy block. If this value is 0 or less,
then it will default to 3600 minutes.
</description>
</property>

<property>
<name>dfs.namenode.excess.redundancy.timeout.check.limit</name>
<value>1000</value>
<description>
Limits number of blocks used to check for excess redundancy timeout.
If this value is 0 or less, then it will default to 1000.
</description>
</property>

<property>
<name>dfs.namenode.stale.datanode.minimum.interval</name>
<value>3</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,25 @@ public FsDatasetTestUtils getFsDatasetTestUtils(DataNode dn) {
.newInstance(dn);
}

/**
* Wait for the datanodes in the cluster to process any block
* deletions that have already been asynchronously queued.
*/
public void waitForDNDeletions()
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
for (DataNode dn : getDataNodes()) {
if (getFsDatasetTestUtils(dn).getPendingAsyncDeletions() > 0) {
return false;
}
}
return true;
}
}, 1000, 10000);
}

/**
* Gets the rpc port used by the NameNode, because the caller
* supplied port is not necessarily the actual port used.
Expand Down
Loading