Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ protected void recover() throws IOException {
Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length);
final int dataBlkNum = ecPolicy.getNumDataUnits();
final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits();
int zeroLenReplicaCnt = 0;
int dnNotHaveReplicaCnt = 0;
//check generation stamps
for (int i = 0; i < locs.length; i++) {
DatanodeID id = locs[i];
Expand Down Expand Up @@ -419,10 +421,14 @@ protected void recover() throws IOException {
if (info == null) {
LOG.debug("Block recovery: DataNode: {} does not have " +
"replica for block: (block={}, internalBlk={})", id, block, internalBlk);
dnNotHaveReplicaCnt++;
} else {
LOG.debug("Block recovery: Ignored replica with invalid "
+ "generation stamp or length: {} from DataNode: {} by block: {}",
info, id, block);
if (info.getNumBytes() == 0) {
zeroLenReplicaCnt++;
}
}
}
} catch (RecoveryInProgressException ripE) {
Expand All @@ -436,9 +442,17 @@ protected void recover() throws IOException {
"datanode={})", block, internalBlk, id, e);
}
}
checkLocations(syncBlocks.size());

final long safeLength = getSafeLength(syncBlocks);
final long safeLength;
if (dnNotHaveReplicaCnt + zeroLenReplicaCnt <= locs.length - ecPolicy.getNumDataUnits()) {
checkLocations(syncBlocks.size());
safeLength = getSafeLength(syncBlocks);
} else {
safeLength = 0;
LOG.warn("Block recovery: More than {} datanodes do not have the replica of block {}." +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest printing out the value of zeroLenReplicaCnt as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it. Thanks sir.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this "More than" mean?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems useless here, have removed it . Originally, it was more than (9 - 6) datanodes ...

" Will remove this block.", dnNotHaveReplicaCnt, block);
}

LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", block,
block.getNumBytes(), safeLength, syncBlocks);

Expand All @@ -452,11 +466,13 @@ protected void recover() throws IOException {
rurList.add(r);
}
}
assert rurList.size() >= dataBlkNum : "incorrect safe length";

// Recovery the striped block by truncating internal blocks to the safe
// length. Abort if there is any failure in this step.
truncatePartialBlock(rurList, safeLength);
if (safeLength > 0) {
Preconditions.checkArgument(rurList.size() >= dataBlkNum, "incorrect safe length");
// Recovery the striped block by truncating internal blocks to the safe
// length. Abort if there is any failure in this step.
truncatePartialBlock(rurList, safeLength);
}

// notify Namenode the new size and locations
final DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
Expand All @@ -469,11 +485,20 @@ protected void recover() throws IOException {
int index = (int) (r.rInfo.getBlockId() &
HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
newLocs[index] = r.id;
newStorages[index] = r.storageID;
if (r.storageID != null) {
newStorages[index] = r.storageID;
}
}
ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
safeLength, recoveryId);
DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid);
if (safeLength == 0) {
nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
newBlock.getNumBytes(), true, true, newLocs, newStorages);
LOG.info("After block recovery, the length of new block is 0. " +
"Will remove this block: {} from file.", newBlock);
return;
}
nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
newBlock.getNumBytes(), true, false, newLocs, newStorages);
}
Expand Down Expand Up @@ -527,8 +552,8 @@ long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
private void checkLocations(int locationCount)
throws IOException {
if (locationCount < ecPolicy.getNumDataUnits()) {
throw new IOException(block + " has no enough internal blocks" +
", unable to start recovery. Locations=" + Arrays.asList(locs));
throw new IOException(block + " has no enough internal blocks(current:" + locationCount +
"), unable to start recovery. Locations=" + Arrays.asList(locs));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ private BlockReader createBlockReader(long offsetInBlock) {
block.getNumBytes() - offsetInBlock, true, "", peer, source,
null, stripedReader.getCachingStrategy(), -1, conf);
} catch (IOException e) {
LOG.info("Exception while creating remote block reader, datanode {}",
source, e);
LOG.info("Exception while creating remote block reader for {}, datanode {}",
block, source, e);
IOUtils.closeStream(peer);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,38 @@ public void testSafeLength() {
checkSafeLength(1024 * 1024 * 1024, 6442450944L); // Length of: 1 GiB
}

/**
* 1. Write 1MB data, then flush it.
* 2. Mock client quiet exceptionally.
* 3. Trigger lease recovery.
* 4. Lease recovery successfully.
*/
@Test
public void testLeaseRecoveryWithManyZeroLengthReplica() {
int cellSize = (int)1024 * 1024;
try {
final FSDataOutputStream out = dfs.create(p);
final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
.getWrappedStream();
for (int pos = 0; pos < cellSize; pos++) {
out.write(StripedFileTestUtil.getByte(pos));
}
StripedDataStreamer first = stripedOut.getStripedDataStreamer(0);
waitStreamerAllAcked(first);
stopBlockStream(first);
for (int i = 1; i < dataBlocks + parityBlocks; i++) {
StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
waitStreamerAllAcked(s);
stopBlockStream(s);
}
recoverLease();
LOG.info("Trigger recover lease manually successfully.");
} catch (Throwable e) {
String msg = "failed testCase" + StringUtils.stringifyException(e);
Assert.fail(msg);
}
}

private void checkSafeLength(int blockLength, long expectedSafeLength) {
int[] blockLengths = new int[]{blockLength, blockLength, blockLength, blockLength,
blockLength, blockLength};
Expand Down