Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -4305,7 +4305,6 @@ private void chooseExcessRedundancyStriped(BlockCollection bc,
BitSet found = new BitSet(groupSize); //indices found
BitSet duplicated = new BitSet(groupSize); //indices found more than once
HashMap<DatanodeStorageInfo, Integer> storage2index = new HashMap<>();
boolean logEmptyExcessType = true;
for (DatanodeStorageInfo storage : nonExcess) {
int index = sblk.getStorageBlockIndex(storage);
assert index >= 0;
Expand All @@ -4329,23 +4328,11 @@ private void chooseExcessRedundancyStriped(BlockCollection bc,
Integer index = storage2index.get(delStorageHint);
if (index != null && duplicated.get(index)) {
processChosenExcessRedundancy(nonExcess, delStorageHint, storedBlock);
logEmptyExcessType = false;
}
}

// cardinality of found indicates the expected number of internal blocks
final int numOfTarget = found.cardinality();
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
bc.getStoragePolicyID());
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
(short) numOfTarget, DatanodeStorageInfo.toStorageTypes(nonExcess));
if (excessTypes.isEmpty()) {
if(logEmptyExcessType) {
LOG.warn("excess types chosen for block {} among storages {} is empty",
storedBlock, nonExcess);
}
return;
}

BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(STRIPED);
// for each duplicated index, delete some replicas until only one left
Expand All @@ -4359,9 +4346,11 @@ private void chooseExcessRedundancyStriped(BlockCollection bc,
}
}
if (candidates.size() > 1) {
List<StorageType> internalExcessTypes = storagePolicy.chooseExcess(
(short) 1, DatanodeStorageInfo.toStorageTypes(candidates));
List<DatanodeStorageInfo> replicasToDelete = placementPolicy
.chooseReplicasToDelete(nonExcess, candidates, (short) 1,
excessTypes, null, null);
internalExcessTypes, null, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Choose redundant EC replicas to delete from blk_{} which is located in {}",
sblk.getBlockId(), storage2index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
Expand Down Expand Up @@ -575,5 +576,78 @@ public void testReconstructionWithStorageTypeNotEnough() throws Exception {
cluster.shutdown();
}
}
@Test
public void testDeleteOverReplicatedStripedBlock() throws Exception {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
false);
StorageType[][] st = new StorageType[groupSize + 2][1];
for (int i = 0; i < st.length-1; i++){
st[i] = new StorageType[]{StorageType.SSD};
}
st[st.length -1] = new StorageType[]{StorageType.DISK};

cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 2)
.storagesPerDatanode(1)
.storageTypes(st)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
try {
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath,
StripedFileTestUtil.getDefaultECPolicy().getName());
fs.setStoragePolicy(dirPath, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME);
DFSTestUtil.createFile(fs, filePath,
cellSize * dataBlocks * 2, (short) 1, 0L);
// Stop a dn
LocatedBlocks blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
DatanodeInfo dnToStop = block.getLocations()[0];

MiniDFSCluster.DataNodeProperties dnProp =
cluster.stopDataNode(dnToStop.getXferAddr());
cluster.setDataNodeDead(dnToStop);

// Wait for reconstruction to happen
DFSTestUtil.waitForReplication(fs, filePath, groupSize, 15 * 1000);

DatanodeInfo dnToStop2 = block.getLocations()[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here may be add cluster.stopDataNode(dnToStop2.getXferAddr());

cluster.stopDataNode(dnToStop2.getXferAddr());
cluster.setDataNodeDead(dnToStop2);
DFSTestUtil.waitForReplication(fs, filePath, groupSize, 15 * 1000);

// Bring the dn back: 10 internal blocks now
cluster.restartDataNode(dnProp);
DFSTestUtil.verifyClientStats(conf, cluster);

// Currently namenode is able to track the missing block. And restart NN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here why need restart namenode?
For Line[629-639]

  for (DataNode dn : cluster.getDataNodes()) {
        DataNodeTestUtils.triggerBlockReport(dn);
      }

      BlockManager bm = cluster.getNamesystem().getBlockManager();
      GenericTestUtils.waitFor(()
              -> bm.getPendingDeletionBlocksCount() == 0,
          10, 2000);

      for (DataNode dn : cluster.getDataNodes()) {
        DataNodeTestUtils.triggerHeartbeat(dn);
      }

      for (DataNode dn : cluster.getDataNodes()) {
        DataNodeTestUtils.triggerDeletionReport(dn);
    }

Maybe update this logic to avoid using Thread.sleep(3000), what do you think?

for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}
BlockManager bm = cluster.getNamesystem().getBlockManager();
Thread.sleep(3000);
GenericTestUtils.waitFor(() ->
bm.getPendingDeletionBlocksCount() == 0, 10, 2000);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerHeartbeat(dn);
}
boolean isDeletedRedundantBlock = true;
blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
block = (LocatedStripedBlock) blks.getLastLocatedBlock();
BitSet bitSet = new BitSet(groupSize);
for (byte index : block.getBlockIndices()) {
if(bitSet.get(index)){
isDeletedRedundantBlock = false;
}
bitSet.set(index);
}
assertTrue(isDeletedRedundantBlock);
} finally {
cluster.shutdown();
}
}
}