Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2396,7 +2396,9 @@ boolean validateReconstructionWork(BlockReconstructionWork rw) {
}

// Add block to the datanode's task list
rw.addTaskToDatanode(numReplicas);
if (!rw.addTaskToDatanode(numReplicas)) {
return false;
}
DatanodeStorageInfo.incrementBlocksScheduled(targets);

// Move the block-replication into a "pending" state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,5 @@ abstract void chooseTargets(BlockPlacementPolicy blockplacement,
*
* @param numberReplicas replica details
*/
abstract void addTaskToDatanode(NumberReplicas numberReplicas);
abstract boolean addTaskToDatanode(NumberReplicas numberReplicas);
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ private int chooseSource4SimpleReplication() {
}

@Override
void addTaskToDatanode(NumberReplicas numberReplicas) {
boolean addTaskToDatanode(NumberReplicas numberReplicas) {
final DatanodeStorageInfo[] targets = getTargets();
assert targets.length > 0;
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();

boolean flag = true;
if (hasNotEnoughRack()) {
// if we already have all the internal blocks, but not enough racks,
// we only need to replicate one internal block to a new rack
Expand All @@ -152,6 +152,9 @@ void addTaskToDatanode(NumberReplicas numberReplicas) {
List<Integer> leavingServiceSources = findLeavingServiceSources();
// decommissioningSources.size() should be >= targets.length
final int num = Math.min(leavingServiceSources.size(), targets.length);
if (num == 0) {
flag = false;
}
for (int i = 0; i < num; i++) {
createReplicationWork(leavingServiceSources.get(i), targets[i]);
}
Expand All @@ -160,6 +163,7 @@ void addTaskToDatanode(NumberReplicas numberReplicas) {
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
}
return flag;
}

private void createReplicationWork(int sourceIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ assert getSrcNodes().length > 0
}

@Override
void addTaskToDatanode(NumberReplicas numberReplicas) {
boolean addTaskToDatanode(NumberReplicas numberReplicas) {
getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets());
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,59 @@ public void testFileChecksumAfterDecommission() throws Exception {
fileChecksum1.equals(fileChecksum2));
}

/**
* Test decommission when DN marked as busy.
* @throwsException
*/
@Test(timeout = 120000)
public void testBusyAfterDecommissionNode() throws Exception {
byte busyDNIndex = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any consideration when define byte type for index here? Not blocker just out of interest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Hexiaoqiao for you comment.
there is no special meaning to define byte type, maybe we can change it to int type.

//1. create EC file
final Path ecFile = new Path(ecDir, "testBusyAfterDecommissionNode");
int writeBytes = cellSize * dataBlocks;
writeStripedFile(dfs, ecFile, writeBytes);
Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);

//2. make once DN busy
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
.getINode4Write(ecFile.toString()).asFile();
BlockInfo firstBlock = fileNode.getBlocks()[0];
DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
DatanodeDescriptor busyNode =
dnStorageInfos[busyDNIndex].getDatanodeDescriptor();
for (int j = 0; j < replicationStreamsHardLimit; j++) {
busyNode.incrementPendingReplicationWithoutTargets();
}

//3. decomission one node
List<DatanodeInfo> decommisionNodes = new ArrayList<>();
decommisionNodes.add(busyNode);
decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSION_INPROGRESS);

final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
bm.getDatanodeManager().fetchDatanodes(live, null, false);
int liveDecommissioning = 0;
for (DatanodeDescriptor node : live) {
liveDecommissioning += node.isDecommissionInProgress() ? 1 : 0;
}
assertEquals(decommisionNodes.size(), liveDecommissioning);

//4. wait for decommission block to replicate
Thread.sleep(3000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about to use GenericTestUtils.waitFor rather than Thread.sleep?


int blocksScheduled = 0;
final List<DatanodeDescriptor> dnList = new ArrayList<>();
fsn.getBlockManager().getDatanodeManager().fetchDatanodes(dnList, null,
false);
for (DatanodeDescriptor dn : dnList) {
blocksScheduled += dn.getBlocksScheduled();
}
assertEquals(0, blocksScheduled);
assertEquals(0, bm.getPendingReconstructionBlocksCount());
assertEquals(1, bm.getLowRedundancyBlocksCount());
}

private void testDecommission(int writeBytes, int storageCount,
int decomNodeCount, String filename) throws IOException, Exception {
Path ecFile = new Path(ecDir, filename);
Expand Down