Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT = true;

public static final String DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED =
"dfs.namenode.reconstruct.ecblock-groups.limit.enable";
public static final boolean DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED_DEFAULT = false;

public static final String DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT =
"dfs.namenode.reconstruct.ecblock-groups.limit";
public static final long DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_DEFAULT = 1000;

@Deprecated
public static final String DFS_WEBHDFS_USER_PATTERN_KEY =
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;

Expand Down Expand Up @@ -469,6 +470,12 @@ public long getTotalECBlockGroups() {
/** Storages accessible from multiple DNs. */
private final ProvidedStorageMap providedStorageMap;

/** Whether to enable limit EC block reconstruct.*/
private volatile boolean reconstructECBlockGroupsLimitEnabled;
private volatile long reconstructECBlockGroupsLimit;
private final LightWeightHashSet<BlockInfo> reconstructECBlockGroups =
new LightWeightHashSet<>();

public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
Expand Down Expand Up @@ -625,13 +632,22 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);

this.reconstructECBlockGroupsLimitEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED_DEFAULT);
this.reconstructECBlockGroupsLimit = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_DEFAULT);

LOG.info("defaultReplication = {}", defaultReplication);
LOG.info("maxReplication = {}", maxReplication);
LOG.info("minReplication = {}", minReplication);
LOG.info("maxReplicationStreams = {}", maxReplicationStreams);
LOG.info("redundancyRecheckInterval = {}ms", redundancyRecheckIntervalMs);
LOG.info("encryptDataTransfer = {}", encryptDataTransfer);
LOG.info("maxNumBlocksToLog = {}", maxNumBlocksToLog);
LOG.info("reconstructECBlockGroupsLimit = {}", reconstructECBlockGroupsLimit);
LOG.info("reconstructECBlockGroupsLimitEnabled = {}", reconstructECBlockGroupsLimitEnabled);
}

private static BlockTokenSecretManager createBlockTokenSecretManager(
Expand Down Expand Up @@ -1295,6 +1311,8 @@ public LocatedBlock convertLastBlockToUnderConstruction(
new DatanodeStorageInfo[locations.size()];
locations.toArray(removedBlockTargets);
DatanodeStorageInfo.decrementBlocksScheduled(removedBlockTargets);
//Remove block from reconstructECBlockGroups queue.
removeReconstructECBlockGroups(lastBlock);
}

// remove this block from the list of pending blocks to be deleted.
Expand Down Expand Up @@ -2089,12 +2107,21 @@ int computeReconstructionWorkForBlocks(
final DatanodeStorageInfo[] targets = rw.getTargets();
if (targets == null || targets.length == 0) {
rw.resetTargets();
if (removeReconstructECBlockGroups(rw.getBlock())) {
LOG.debug("Removing block {} from reconstructECBlockGroups, " +
"now size {}", rw.getBlock(), getReconstructECBlockGroupCount());
}
continue;
}

synchronized (neededReconstruction) {
if (validateReconstructionWork(rw)) {
scheduledWork++;
} else {
if (removeReconstructECBlockGroups(rw.getBlock())) {
LOG.debug("Removing block {} from reconstructECBlockGroups, " +
"now size {}", rw.getBlock(), getReconstructECBlockGroupCount());
}
}
}
}
Expand Down Expand Up @@ -2200,6 +2227,26 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
additionalReplRequired = additionalReplRequired -
numReplicas.decommissioning() -
numReplicas.liveEnteringMaintenanceReplicas();
if (reconstructECBlockGroupsLimitEnabled &&
numReplicas.liveReplicas() < requiredRedundancy) {
LOG.debug("Prepare creating an ErasureCodingWork to {} reconstruct, " +
"currently there are {} EC BlockGroups to reconstruct and " +
"the limit is {}", block, getReconstructECBlockGroupCount(),
reconstructECBlockGroupsLimit);
if (checkReconstructECBlockGroups(block) ||
getReconstructECBlockGroupCount() >= this.reconstructECBlockGroupsLimit) {
LOG.warn("Currently there are {} EC BlockGroups to reconstruct and " +
"the limit is {} block {} cannot create",
getReconstructECBlockGroupCount(),
reconstructECBlockGroupsLimit, block);
return null;
}
addReconstructECBlockGroups(block);
LOG.debug("Complete creating an ErasureCodingWork to {} reconstruct, " +
"currently there are {} EC BlockGroups to reconstruct and " +
"the limit is {}", block, getReconstructECBlockGroupCount(),
reconstructECBlockGroupsLimit);
}
}
final DatanodeDescriptor[] newSrcNodes =
new DatanodeDescriptor[srcNodes.length];
Expand Down Expand Up @@ -4220,6 +4267,7 @@ public void addBlock(DatanodeStorageInfo storageInfo, Block block,
if (storedBlock != null &&
block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
if (pendingReconstruction.decrement(storedBlock, storageInfo)) {
removeReconstructECBlockGroups(storedBlock);
NameNode.getNameNodeMetrics().incSuccessfulReReplications();
}
}
Expand Down Expand Up @@ -4649,6 +4697,7 @@ public void removeBlock(BlockInfo block) {
}
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
postponedMisreplicatedBlocks.remove(block);
removeReconstructECBlockGroups(block);
}

public BlockInfo getStoredBlock(Block block) {
Expand Down Expand Up @@ -5134,6 +5183,7 @@ public void clearQueues() {
invalidateBlocks.clear();
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();
clearReconstructECBlockGroups();
};

public static LocatedBlock newLocatedBlock(
Expand Down Expand Up @@ -5498,4 +5548,47 @@ public void disableSPS() {
public StoragePolicySatisfyManager getSPSManager() {
return spsManager;
}

boolean removeReconstructECBlockGroups(BlockInfo block) {
if (reconstructECBlockGroupsLimitEnabled && block.isStriped()) {
synchronized (reconstructECBlockGroups) {
if (reconstructECBlockGroups.remove(block)) {
LOG.debug("Removing block {} from reconstructECBlockGroups, " +
"now size {}", block, getReconstructECBlockGroupCount());
return true;
}
}
}
return false;
}

boolean checkReconstructECBlockGroups(BlockInfo block) {
synchronized (reconstructECBlockGroups) {
if (reconstructECBlockGroups.contains(block) &&
reconstructECBlockGroups.remove(block)) {
LOG.warn("Check Removing block {} from reconstructECBlockGroups, " +
"now size {}", block, getReconstructECBlockGroupCount());
return true;
}
}
return false;
}

boolean addReconstructECBlockGroups(BlockInfo block) {
synchronized (reconstructECBlockGroups) {
return reconstructECBlockGroups.add(block);
}
}

public long getReconstructECBlockGroupCount() {
synchronized (reconstructECBlockGroups) {
return reconstructECBlockGroups.size();
}
}

void clearReconstructECBlockGroups(){
synchronized (reconstructECBlockGroups) {
reconstructECBlockGroups.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;

import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.Assert;
Expand Down Expand Up @@ -430,4 +434,92 @@ public void testReconstructionWork() throws Exception {
dfsCluster.shutdown();
}
}

@Test
public void testRecoveryTasksForBlockGroupsLimit() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1000);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED,
true);
long limit = 2;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT, limit);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 1).build();
try {
cluster.waitActive();
cluster.getFileSystem().enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
final int numBlocks = 6;
DFSTestUtil.createStripedFile(cluster, filePath,
dirPath, numBlocks, 1, true);
// all blocks will be located at first GROUP_SIZE DNs, the last DN is
// empty because of the util function createStripedFile

// make sure the file is complete in NN
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
.getINode4Write(filePath.toString()).asFile();
assertFalse(fileNode.isUnderConstruction());
assertTrue(fileNode.isStriped());
BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(numBlocks, blocks.length);

BlockManager bm = cluster.getNamesystem().getBlockManager();

BlockInfo firstBlock = fileNode.getBlocks()[0];
DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);

// make numOfMissed internal blocks missed
for (int i = 0; i < 1; i++) {
DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor();
assertEquals(numBlocks, missedNode.numBlocks());
bm.getDatanodeManager().removeDatanode(missedNode);
}
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
assertEquals( numBlocks, bm.getLowRedundancyBlocksCount());
// all the reconstruction work will be scheduled on the last DN
DataNode lastDn = cluster.getDataNodes().get(groupSize);
DatanodeDescriptor last = bm.getDatanodeManager().getDatanode
(lastDn.getDatanodeId());
BlockManagerTestUtil.getComputedDatanodeWork(bm);
int count = 0;
while (bm.getPendingReconstructionBlocksCount() > 0) {
count++;
assertEquals("Counting the number of outstanding EC tasks", limit,
last.getNumberOfBlocksToBeErasureCoded());
assertEquals(limit, bm.getPendingReconstructionBlocksCount());
assertEquals(limit, bm.getReconstructECBlockGroupCount());
List<BlockECReconstructionInfo> reconstruction =
last.getErasureCodeCommand((int) limit);

for (BlockECReconstructionInfo info : reconstruction) {
String poolId = cluster.getNamesystem().getBlockPoolId();
// let two datanodes (other than the one that already has the data) to
// report to NN
DatanodeRegistration dnR = lastDn.getDNRegistrationForBP(poolId);
StorageReceivedDeletedBlocks[] report = {
new StorageReceivedDeletedBlocks(
new DatanodeStorage("Fake-storage-ID-Ignored"),
new ReceivedDeletedBlockInfo[]{new ReceivedDeletedBlockInfo(
info.getExtendedBlock().getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, "")})
};
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
}
bm.flushBlockOps();
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
}
assertEquals(numBlocks/limit, count);
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
assertEquals(0, last.getNumberOfBlocksToBeErasureCoded());
assertEquals(0, bm.getPendingReconstructionBlocksCount());
} finally {
cluster.shutdown();
}
}
}