-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-17218. NameNode should process time out excess redundancy blocks #6176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
f79f1d2
7fa80cc
cafe3c6
77c1342
59e36cc
09eed20
b7f9b23
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,7 @@ | |
| import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState; | ||
| import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; | ||
| import org.apache.hadoop.hdfs.server.blockmanagement.PendingReconstructionBlocks.PendingBlockInfo; | ||
| import org.apache.hadoop.hdfs.server.blockmanagement.ExcessRedundancyMap.ExcessBlockInfo; | ||
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; | ||
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; | ||
| import org.apache.hadoop.hdfs.server.namenode.CachedBlock; | ||
|
|
@@ -116,6 +117,7 @@ | |
|
|
||
| import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; | ||
|
|
||
| import org.apache.hadoop.hdfs.util.LightWeightHashSet; | ||
| import org.apache.hadoop.metrics2.util.MBeans; | ||
| import org.apache.hadoop.net.Node; | ||
| import org.apache.hadoop.security.UserGroupInformation; | ||
|
|
@@ -482,6 +484,16 @@ public int getPendingSPSPaths() { | |
| /** Storages accessible from multiple DNs. */ | ||
| private final ProvidedStorageMap providedStorageMap; | ||
|
|
||
| /** | ||
| * Timeout for excess redundancy block. | ||
| */ | ||
| private long excessRedundancyTimeout; | ||
|
|
||
| /** | ||
| * Limits number of blocks used to check for excess redundancy timeout. | ||
| */ | ||
| private long excessRedundancyTimeoutCheckLimit; | ||
|
|
||
| public BlockManager(final Namesystem namesystem, boolean haEnabled, | ||
| final Configuration conf) throws IOException { | ||
| this.namesystem = namesystem; | ||
|
|
@@ -589,6 +601,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, | |
| conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED, | ||
| DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT); | ||
|
|
||
| setExcessRedundancyTimeout(conf.getLong(DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_KEY, | ||
| DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC)); | ||
| setExcessRedundancyTimeoutCheckLimit(conf.getLong( | ||
| DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT, | ||
| DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT)); | ||
|
|
||
| printInitialConfigs(); | ||
| } | ||
|
|
||
|
|
@@ -3040,6 +3058,99 @@ void rescanPostponedMisreplicatedBlocks() { | |
| (Time.monotonicNow() - startTime), endSize, (startSize - endSize)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Sets the timeout (in seconds) for excess redundancy blocks, if the provided timeout is | ||
| * less than or equal to 0, the default value is used (converted to milliseconds). | ||
| * @param timeOut The time (in seconds) to set as the excess redundancy block timeout. | ||
| */ | ||
| public void setExcessRedundancyTimeout(long timeOut) { | ||
|
||
| if (timeOut <= 0) { | ||
| this.excessRedundancyTimeout = DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC * 1000L; | ||
| } else { | ||
| this.excessRedundancyTimeout = timeOut * 1000L; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Sets the limit number of blocks for checking excess redundancy timeout. | ||
| * If the provided limit is less than or equal to 0, the default limit is used. | ||
| * | ||
| * @param limit The limit number of blocks used to check for excess redundancy timeout. | ||
| */ | ||
| public void setExcessRedundancyTimeoutCheckLimit(long limit) { | ||
| if (excessRedundancyTimeoutCheckLimit <= 0) { | ||
| this.excessRedundancyTimeoutCheckLimit = | ||
| DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_CHECK_LIMIT_DEFAULT; | ||
| } else { | ||
| this.excessRedundancyTimeoutCheckLimit = limit; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Process timed-out blocks in the excess redundancy map. | ||
| */ | ||
| void processTimedOutExcessBlocks() { | ||
| if (excessRedundancyMap.size() == 0) { | ||
| return; | ||
| } | ||
| namesystem.writeLock(); | ||
| long now = Time.monotonicNow(); | ||
| int processed = 0; | ||
| try { | ||
| Iterator<Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>>> iter = | ||
| excessRedundancyMap.getExcessRedundancyMap().entrySet().iterator(); | ||
| while (iter.hasNext() && processed < excessRedundancyTimeoutCheckLimit) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the size of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get it, i will update it later. Thanks @zhangshuyan0 for your comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update PR. |
||
| Map.Entry<String, LightWeightHashSet<ExcessBlockInfo>> entry = iter.next(); | ||
| String datanodeUuid = entry.getKey(); | ||
| LightWeightHashSet<ExcessBlockInfo> blocks = entry.getValue(); | ||
| List<ExcessRedundancyMap.ExcessBlockInfo> sortedBlocks = new ArrayList<>(blocks); | ||
|
||
| // Sort blocks by timestamp in descending order. | ||
| Collections.sort(sortedBlocks); | ||
|
|
||
| for (ExcessBlockInfo excessBlockInfo : sortedBlocks) { | ||
| if (processed >= excessRedundancyTimeoutCheckLimit) { | ||
| break; | ||
| } | ||
|
|
||
| processed++; | ||
| // If the datanode doesn't have any excess block that has exceeded the timeout, | ||
| // can exit this loop. | ||
| if (now <= excessBlockInfo.getTimeStamp() + excessRedundancyTimeout) { | ||
| break; | ||
| } | ||
|
|
||
| BlockInfo blockInfo = excessBlockInfo.getBlockInfo(); | ||
| BlockInfo bi = blocksMap.getStoredBlock(blockInfo); | ||
| if (bi == null || bi.isDeleted()) { | ||
| continue; | ||
| } | ||
|
|
||
| Iterator<DatanodeStorageInfo> iterator = blockInfo.getStorageInfos(); | ||
| while (iterator.hasNext()) { | ||
| DatanodeStorageInfo datanodeStorageInfo = iterator.next(); | ||
| DatanodeDescriptor datanodeDescriptor = datanodeStorageInfo.getDatanodeDescriptor(); | ||
| if (datanodeDescriptor.getDatanodeUuid().equals(datanodeUuid)) { | ||
| if (datanodeStorageInfo.getState().equals(State.NORMAL)) { | ||
|
||
| final Block block = getBlockOnStorage(blockInfo, | ||
| datanodeStorageInfo); | ||
| if (!containsInvalidateBlock(datanodeDescriptor, block)) { | ||
| addToInvalidates(block, datanodeDescriptor); | ||
| LOG.debug("Excess block timeout ({}, {}) is added to invalidated.", | ||
| block, datanodeDescriptor); | ||
| } | ||
| excessBlockInfo.setTimeStamp(); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } finally { | ||
| namesystem.writeUnlock("processTimedOutExcessBlocks"); | ||
| LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now)); | ||
| } | ||
| } | ||
|
|
||
| Collection<Block> processReport( | ||
| final DatanodeStorageInfo storageInfo, | ||
|
|
@@ -5231,6 +5342,7 @@ public void run() { | |
| computeDatanodeWork(); | ||
| processPendingReconstructions(); | ||
| rescanPostponedMisreplicatedBlocks(); | ||
| processTimedOutExcessBlocks(); | ||
| lastRedundancyCycleTS.set(Time.monotonicNow()); | ||
| } | ||
| TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,8 @@ | |
|
|
||
| import org.apache.hadoop.classification.VisibleForTesting; | ||
|
|
||
| import static org.apache.hadoop.util.Time.monotonicNow; | ||
|
|
||
| /** | ||
| * Maps a datnode to the set of excess redundancy details. | ||
| * | ||
|
|
@@ -35,7 +37,7 @@ | |
| class ExcessRedundancyMap { | ||
| public static final Logger blockLog = NameNode.blockStateChangeLog; | ||
|
|
||
| private final Map<String, LightWeightHashSet<BlockInfo>> map =new HashMap<>(); | ||
| private final Map<String, LightWeightHashSet<ExcessBlockInfo>> map = new HashMap<>(); | ||
| private final AtomicLong size = new AtomicLong(0L); | ||
|
|
||
| /** | ||
|
|
@@ -50,7 +52,7 @@ long size() { | |
| */ | ||
| @VisibleForTesting | ||
| synchronized int getSize4Testing(String dnUuid) { | ||
| final LightWeightHashSet<BlockInfo> set = map.get(dnUuid); | ||
| final LightWeightHashSet<ExcessBlockInfo> set = map.get(dnUuid); | ||
| return set == null? 0: set.size(); | ||
| } | ||
|
|
||
|
|
@@ -64,8 +66,8 @@ synchronized void clear() { | |
| * datanode and the given block? | ||
| */ | ||
| synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { | ||
| final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid()); | ||
| return set != null && set.contains(blk); | ||
| final LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid()); | ||
| return set != null && set.contains(new ExcessBlockInfo(blk)); | ||
|
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -75,12 +77,12 @@ synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { | |
| * @return true if the block is added. | ||
| */ | ||
| synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { | ||
| LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid()); | ||
| LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid()); | ||
| if (set == null) { | ||
| set = new LightWeightHashSet<>(); | ||
| map.put(dn.getDatanodeUuid(), set); | ||
| } | ||
| final boolean added = set.add(blk); | ||
| final boolean added = set.add(new ExcessBlockInfo(blk)); | ||
| if (added) { | ||
| size.incrementAndGet(); | ||
| blockLog.debug("BLOCK* ExcessRedundancyMap.add({}, {})", dn, blk); | ||
|
|
@@ -95,12 +97,12 @@ synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { | |
| * @return true if the block is removed. | ||
| */ | ||
| synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { | ||
| final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid()); | ||
| final LightWeightHashSet<ExcessBlockInfo> set = map.get(dn.getDatanodeUuid()); | ||
| if (set == null) { | ||
| return false; | ||
| } | ||
|
|
||
| final boolean removed = set.remove(blk); | ||
| final boolean removed = set.remove(new ExcessBlockInfo(blk)); | ||
|
||
| if (removed) { | ||
| size.decrementAndGet(); | ||
| blockLog.debug("BLOCK* ExcessRedundancyMap.remove({}, {})", dn, blk); | ||
|
|
@@ -111,4 +113,56 @@ synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { | |
| } | ||
| return removed; | ||
| } | ||
|
|
||
| synchronized Map<String, LightWeightHashSet<ExcessBlockInfo>> getExcessRedundancyMap() { | ||
| return map; | ||
| } | ||
|
|
||
| /** | ||
| * An object that contains information about a block that is being excess redundancy. | ||
| * It records the timestamp when added excess redundancy map of this block. | ||
| */ | ||
| static class ExcessBlockInfo implements Comparable<ExcessBlockInfo> { | ||
| private long timeStamp; | ||
| private BlockInfo blockInfo; | ||
|
|
||
| ExcessBlockInfo(BlockInfo blockInfo) { | ||
| this.timeStamp = monotonicNow(); | ||
| this.blockInfo = blockInfo; | ||
| } | ||
|
|
||
| public BlockInfo getBlockInfo() { | ||
| return blockInfo; | ||
| } | ||
|
|
||
| long getTimeStamp() { | ||
| return timeStamp; | ||
| } | ||
|
|
||
| void setTimeStamp() { | ||
| timeStamp = monotonicNow(); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return blockInfo.hashCode(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object obj) { | ||
| if (this == obj) { | ||
| return true; | ||
| } | ||
| if (!(obj instanceof ExcessBlockInfo)) { | ||
| return false; | ||
| } | ||
| ExcessBlockInfo other = (ExcessBlockInfo) obj; | ||
| return (this.blockInfo.equals(other.blockInfo)); | ||
|
||
| } | ||
|
|
||
| @Override | ||
| public int compareTo(ExcessBlockInfo o) { | ||
| return Long.compare(this.timeStamp, o.timeStamp); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC -> DFS_NAMENODE_EXCESS_REDUNDANCY_TIMEOUT_SEC_DEAFULT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhangshuyan0 for your comment.
already fixed!