Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
Expand Down Expand Up @@ -351,11 +350,8 @@ class BlockSender implements java.io.Closeable {
} catch (FileNotFoundException e) {
if ((e.getMessage() != null) && !(e.getMessage()
.contains("Too many open files"))) {
// The replica is on its volume map but not on disk
datanode
.notifyNamenodeDeletedBlock(block, replica.getStorageUuid());
datanode.data.invalidate(block.getBlockPoolId(),
new Block[] {block.getLocalBlock()});
datanode.data.invalidateMissingBlock(block.getBlockPoolId(),
block.getLocalBlock());
}
throw e;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,14 @@ void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
void invalidate(String bpid, Block invalidBlks[]) throws IOException;

/**
* Caches the specified blocks
* Invalidate a block which is not found on disk.
* @param bpid the block pool ID.
* @param block The block to be invalidated.
*/
void invalidateMissingBlock(String bpid, Block block) throws IOException;

/**
* Caches the specified block
* @param bpid Block pool id
* @param blockIds - block ids to cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,30 @@ public void invalidate(String bpid, ReplicaInfo block) {
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
block.getStorageUuid());
}
/**
* Invalidate a block which is not found on disk. We should remove it from
* memory and notify namenode, but unnecessary to delete the actual on-disk
* block file again.
*
* @param bpid the block pool ID.
* @param block The block to be invalidated.
*/
public void invalidateMissingBlock(String bpid, Block block) {

// The replica seems is on its volume map but not on disk.
// We can't confirm here is block file lost or disk failed.
// If block lost:
// deleted local block file is completely unnecessary
// If disk failed:
// deleted local block file here may lead to missing-block
// when it with only 1 replication left now.
// So remove if from volume map notify namenode is ok.
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
bpid)) {
ReplicaInfo replica = volumeMap.remove(bpid, block);
invalidate(bpid, replica);
}
}

/**
* Remove Replica from ReplicaMap.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,12 @@ public synchronized void invalidate(String bpid, Block[] invalidBlks)
}
}

@Override
public void invalidateMissingBlock(String bpid, Block block)
throws IOException {
this.invalidate(bpid, new Block[]{block});
}

@Override // FSDatasetSpi
public void cache(String bpid, long[] cacheBlks) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public boolean isValidRbw(ExtendedBlock b) {
public void invalidate(String bpid, Block[] invalidBlks) throws IOException {
}

@Override
public void invalidateMissingBlock(String bpid, Block block) {
}

@Override
public void cache(String bpid, long[] blockIds) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1919,4 +1919,63 @@ public void delayDeleteReplica() {
DataNodeFaultInjector.set(oldInjector);
}
}

/**
* Test the block file which is not found when disk with some exception.
* We expect:
* 1. block file wouldn't be deleted from disk.
* 2. block info would be removed from dn memory.
* 3. block would be reported to nn as missing block.
* 4. block would be recovered when disk back to normal.
*/
@Test
public void tesInvalidateMissingBlock() throws Exception {
long blockSize = 1024;
int heatbeatInterval = 1;
HdfsConfiguration c = new HdfsConfiguration();
c.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, heatbeatInterval);
c.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(c).
numDataNodes(1).build();
try {
cluster.waitActive();
DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/a"),
blockSize, (short)1, 0);

String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl fsdataset = (FsDatasetImpl) dn.getFSDataset();
List<ReplicaInfo> replicaInfos = fsdataset.getFinalizedBlocks(bpid);
assertEquals(1, replicaInfos.size());

ReplicaInfo replicaInfo = replicaInfos.get(0);
String blockPath = replicaInfo.getBlockURI().getPath();
String metaPath = replicaInfo.getMetadataURI().getPath();
File blockFile = new File(blockPath);
File metaFile = new File(metaPath);

// Mock local block file not found when disk with some exception.
fsdataset.invalidateMissingBlock(bpid, replicaInfo);

// Assert local block file wouldn't be deleted from disk.
assertTrue(blockFile.exists());
// Assert block info would be removed from ReplicaMap.
assertEquals("null",
fsdataset.getReplicaString(bpid, replicaInfo.getBlockId()));
BlockManager blockManager = cluster.getNameNode().
getNamesystem().getBlockManager();
GenericTestUtils.waitFor(() ->
blockManager.getLowRedundancyBlocksCount() == 1, 100, 5000);

// Mock local block file found when disk back to normal.
FsVolumeSpi.ScanInfo info = new FsVolumeSpi.ScanInfo(
replicaInfo.getBlockId(), blockFile.getParentFile().getAbsoluteFile(),
blockFile.getName(), metaFile.getName(), replicaInfo.getVolume());
fsdataset.checkAndUpdate(bpid, info);
GenericTestUtils.waitFor(() ->
blockManager.getLowRedundancyBlocksCount() == 0, 100, 5000);
} finally {
cluster.shutdown();
}
}
}