Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,9 @@ public void delayDeleteReplica() {}
* Just delay run diff record a while.
*/
public void delayDiffRecord() {}

/**
* Just delay getMetaDataInputStream a while.
*/
public void delayGetMetaDataInputStream() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
Expand Down Expand Up @@ -247,6 +248,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
if (info == null || !info.metadataExists()) {
return null;
}
DataNodeFaultInjector.get().delayGetMetaDataInputStream();
return info.getMetadataInputStream(0);
}

Expand Down Expand Up @@ -2403,8 +2405,9 @@ public void invalidate(String bpid, ReplicaInfo block) {
*
* @param bpid the block pool ID.
* @param block The block to be invalidated.
* @param checkFiles Whether to check data and meta files.
*/
public void invalidateMissingBlock(String bpid, Block block) {
public void invalidateMissingBlock(String bpid, Block block, boolean checkFiles) {

// The replica seems is on its volume map but not on disk.
// We can't confirm here is block file lost or disk failed.
Expand All @@ -2416,11 +2419,21 @@ public void invalidateMissingBlock(String bpid, Block block) {
// So remove if from volume map notify namenode is ok.
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
bpid)) {
ReplicaInfo replica = volumeMap.remove(bpid, block);
invalidate(bpid, replica);
// Check if this block is on the volume map.
ReplicaInfo replica = volumeMap.get(bpid, block);
// Double-check block or meta file existence when checkFiles as true.
if (replica != null && (!checkFiles ||
(!replica.blockDataExists() || !replica.metadataExists()))) {
volumeMap.remove(bpid, block);
invalidate(bpid, replica);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If replica == null, invalidate(bpid, replica); would not execute

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If replica == null should not need to execute invalidate(bpid, replica) avoid cause NPE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, get it

}
}
}

public void invalidateMissingBlock(String bpid, Block block) {
invalidateMissingBlock(bpid, block, true);
}

/**
* Remove Replica from ReplicaMap.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1962,7 +1962,7 @@ public void delayDeleteReplica() {
* 4. block would be recovered when disk back to normal.
*/
@Test
public void tesInvalidateMissingBlock() throws Exception {
public void testInvalidateMissingBlock() throws Exception {
long blockSize = 1024;
int heartbeatInterval = 1;
HdfsConfiguration c = new HdfsConfiguration();
Expand All @@ -1988,7 +1988,7 @@ public void tesInvalidateMissingBlock() throws Exception {
File metaFile = new File(metaPath);

// Mock local block file not found when disk with some exception.
fsdataset.invalidateMissingBlock(bpid, replicaInfo);
fsdataset.invalidateMissingBlock(bpid, replicaInfo, false);

// Assert local block file wouldn't be deleted from disk.
assertTrue(blockFile.exists());
Expand All @@ -2011,4 +2011,95 @@ public void tesInvalidateMissingBlock() throws Exception {
cluster.shutdown();
}
}

@Test
public void testCheckFilesWhenInvalidateMissingBlock() throws Exception {
long blockSize = 1024;
int heartbeatInterval = 1;
HdfsConfiguration c = new HdfsConfiguration();
c.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, heartbeatInterval);
c.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(c).
numDataNodes(1).build();
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
try {
cluster.waitActive();
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.
captureLogs(DataNode.LOG);
BlockReaderTestUtil util = new BlockReaderTestUtil(cluster, new
HdfsConfiguration(conf));
Path path = new Path("/testFile");
util.writeFile(path, 1);
String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dnFSDataset = (FsDatasetImpl) dn.getFSDataset();
List<ReplicaInfo> replicaInfos = dnFSDataset.getFinalizedBlocks(bpid);
assertEquals(1, replicaInfos.size());
DFSTestUtil.readFile(cluster.getFileSystem(), path);
LocatedBlock blk = util.getFileBlocks(path, 512).get(0);
ExtendedBlock block = blk.getBlock();

// Append a new block with an incremented generation stamp.
long newGS = block.getGenerationStamp() + 1;
dnFSDataset.append(block, newGS, 1024);
block.setGenerationStamp(newGS);
ReplicaInfo tmpReplicaInfo = dnFSDataset.getReplicaInfo(blk.getBlock());

DataNodeFaultInjector injector = new DataNodeFaultInjector() {
@Override
public void delayGetMetaDataInputStream() {
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
// Ignore exception.
}
}
};
// Delay to getMetaDataInputStream.
DataNodeFaultInjector.set(injector);

ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
Future<?> blockReaderFuture = executorService.submit(() -> {
try {
// Submit tasks for reading block.
BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
cluster.getFileSystem(), blk, 0, 512);
blockReader.close();
} catch (IOException e) {
// Ignore exception.
}
});

Future<?> finalizeBlockFuture = executorService.submit(() -> {
try {
// Submit tasks for finalizing block.
Thread.sleep(1000);
dnFSDataset.finalizeBlock(block, false);
} catch (Exception e) {
// Ignore exception
}
});

// Wait for both tasks to complete.
blockReaderFuture.get();
finalizeBlockFuture.get();
} finally {
executorService.shutdown();
}

// Validate the replica is exits.
assertNotNull(dnFSDataset.getReplicaInfo(blk.getBlock()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need one more case to check the ReplicaInfo and data file would lost when checkFile=false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestFsDatasetImpl#tesInvalidateMissingBlock line[1991-1997]
here case will simulate the ReplicaInfo would be removed from ReplicaMap when checkFile=false

 // Mock local block file not found when disk with some exception.
 fsdataset.invalidateMissingBlock(bpid, replicaInfo, false);
 // Assert local block file wouldn't be deleted from disk.
 assertTrue(blockFile.exists());
 // Assert block info would be removed from ReplicaMap.
 assertEquals("null", fsdataset.getReplicaString(bpid, replicaInfo.getBlockId()));

Copy link
Contributor

@smarthanwang smarthanwang Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It tests cases the block file not found for any causes. But I am not sure whether the situation as your description would lead to FNE, so I think the case should be constructed and tested

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is to verify whether the UT can reproduce FNE, we can add the following code for verification

GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(DataNode.LOG);
ReplicaInfo tmpReplicaInfo = dnFSDataset.getReplicaInfo(blk.getBlock());
 // Check DN log for FileNotFoundException.
String expectedMsg = String.format("opReadBlock %s received exception " +
              "java.io.FileNotFoundException: %s (No such file or directory)",
          blk.getBlock(), tmpReplicaInfo.getMetadataURI().getPath());
assertTrue("Expected log message not found in DN log.", 
 logCapturer.getOutput().contains(expectedMsg));

How about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.


// Check DN log for FileNotFoundException.
String expectedMsg = String.format("opReadBlock %s received exception " +
"java.io.FileNotFoundException: %s (No such file or directory)",
blk.getBlock(), tmpReplicaInfo.getMetadataURI().getPath());
assertTrue("Expected log message not found in DN log.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there are some deviations in my understanding in last above reply . In short words, I think we should verify two cases:

  1. block from rbw to finialized,without this patch: verify FNE wuold be thrown and ReplicaInfo would be removed
  2. block from rbw to finialized, with this patch: verify FNE wuold not be thrown and ReplicaInfo would
    not be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @smarthanwang for your comment.
In fact, current case block from rbw to finalized anyway will throw FNE .
The current PR is to solve the ReplicaInfo will not be remove in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ZanderXu @zhangshuyan0 @smarthanwang do you have any further comments on this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, current case block from rbw to finalized anyway will throw FNE .

OK,can we reproduce it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @smarthanwang If I understand correctly, here the UT has reproduced.

logCapturer.getOutput().contains(expectedMsg));
} finally {
cluster.shutdown();
DataNodeFaultInjector.set(oldDnInjector);
}
}
}