Skip to content

Commit 6af8a14

Browse files
committed
HDFS-16076. Avoid using slow DataNodes for reading by sorting locations
1 parent 51991c4 commit 6af8a14

7 files changed

Lines changed: 285 additions & 58 deletions

File tree

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
516516
// Whether to enable datanode's stale state detection and usage for reads
517517
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
518518
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
519+
public static final String DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY =
520+
"dfs.namenode.avoid.read.slow.datanode";
521+
public static final boolean
522+
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT = false;
519523
// Whether to enable datanode's stale state detection and usage for writes
520524
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
521525
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,23 +156,36 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {
156156

157157
/**
158158
* Comparator for sorting DataNodeInfo[] based on
159-
* stale, decommissioned and entering_maintenance states.
160-
* Order: live {@literal ->} stale {@literal ->} entering_maintenance
161-
* {@literal ->} decommissioned
159+
* slow, stale, entering_maintenance and decommissioned states.
160+
* Order: live {@literal ->} slow {@literal ->} stale {@literal ->}
161+
* entering_maintenance {@literal ->} decommissioned
162162
*/
163163
@InterfaceAudience.Private
164-
public static class ServiceAndStaleComparator extends ServiceComparator {
164+
public static class StaleAndSlowComparator extends ServiceComparator {
165+
private final boolean avoidStaleDataNodesForRead;
165166
private final long staleInterval;
167+
private final boolean avoidSlowDataNodesForRead;
168+
private final Set<String> slowNodesUuidSet;
166169

167170
/**
168171
* Constructor of ServiceAndStaleComparator
169-
*
172+
* @param avoidStaleDataNodesForRead
173+
* Whether or not to avoid using stale DataNodes for reading.
170174
* @param interval
171175
* The time interval for marking datanodes as stale is passed from
172-
* outside, since the interval may be changed dynamically
176+
* outside, since the interval may be changed dynamically.
177+
* @param avoidSlowDataNodesForRead
178+
* Whether or not to avoid using slow DataNodes for reading.
179+
* @param slowNodesUuidSet
180+
* Slow DataNodes UUID set.
173181
*/
174-
public ServiceAndStaleComparator(long interval) {
182+
public StaleAndSlowComparator(
183+
boolean avoidStaleDataNodesForRead, long interval,
184+
boolean avoidSlowDataNodesForRead, Set<String> slowNodesUuidSet) {
185+
this.avoidStaleDataNodesForRead = avoidStaleDataNodesForRead;
175186
this.staleInterval = interval;
187+
this.avoidSlowDataNodesForRead = avoidSlowDataNodesForRead;
188+
this.slowNodesUuidSet = slowNodesUuidSet;
176189
}
177190

178191
@Override
@@ -183,9 +196,22 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {
183196
}
184197

185198
// Stale nodes will be moved behind the normal nodes
186-
boolean aStale = a.isStale(staleInterval);
187-
boolean bStale = b.isStale(staleInterval);
188-
return aStale == bStale ? 0 : (aStale ? 1 : -1);
199+
if (avoidStaleDataNodesForRead) {
200+
boolean aStale = a.isStale(staleInterval);
201+
boolean bStale = b.isStale(staleInterval);
202+
ret = aStale == bStale ? 0 : (aStale ? 1 : -1);
203+
if (ret != 0) {
204+
return ret;
205+
}
206+
}
207+
208+
// Slow nodes will be moved behind the normal nodes
209+
if (avoidSlowDataNodesForRead) {
210+
boolean aSlow = slowNodesUuidSet.contains(a.getDatanodeUuid());
211+
boolean bSlow = slowNodesUuidSet.contains(b.getDatanodeUuid());
212+
ret = aSlow == bSlow ? 0 : (aSlow ? 1 : -1);
213+
}
214+
return ret;
189215
}
190216
}
191217

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,8 +1104,8 @@ boolean isGoodDatanode(DatanodeDescriptor node,
11041104

11051105
// check if the target is a slow node
11061106
if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
1107-
Set<Node> nodes = DatanodeManager.getSlowNodes();
1108-
if (nodes.contains(node)) {
1107+
Set<String> slowNodesUuidSet = DatanodeManager.getSlowNodesUuidSet();
1108+
if (slowNodesUuidSet.contains(node.getDatanodeUuid())) {
11091109
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
11101110
return false;
11111111
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ public class DatanodeManager {
140140
/** Whether or not to avoid using stale DataNodes for reading */
141141
private final boolean avoidStaleDataNodesForRead;
142142

143+
/** Whether or not to avoid using slow DataNodes for reading. */
144+
private final boolean avoidSlowDataNodesForRead;
145+
143146
/** Whether or not to consider lad for reading. */
144147
private final boolean readConsiderLoad;
145148

@@ -210,7 +213,7 @@ public class DatanodeManager {
210213

211214
@Nullable
212215
private final SlowPeerTracker slowPeerTracker;
213-
private static Set<Node> slowNodesSet = Sets.newConcurrentHashSet();
216+
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
214217
private Daemon slowPeerCollectorDaemon;
215218
private final long slowPeerCollectionInterval;
216219
private final int maxSlowPeerReportNodes;
@@ -242,7 +245,6 @@ public class DatanodeManager {
242245
} else {
243246
networktopology = NetworkTopology.getInstance(conf);
244247
}
245-
246248
this.heartbeatManager = new HeartbeatManager(namesystem,
247249
blockManager, conf);
248250
this.datanodeAdminManager = new DatanodeAdminManager(namesystem,
@@ -273,7 +275,6 @@ public class DatanodeManager {
273275
}
274276
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
275277
new SlowDiskTracker(conf, timer) : null;
276-
277278
this.defaultXferPort = NetUtils.createSocketAddr(
278279
conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
279280
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
@@ -294,11 +295,9 @@ public class DatanodeManager {
294295
} catch (IOException e) {
295296
LOG.error("error reading hosts files: ", e);
296297
}
297-
298298
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
299299
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
300300
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
301-
302301
this.rejectUnresolvedTopologyDN = conf.getBoolean(
303302
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY,
304303
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT);
@@ -313,7 +312,6 @@ public class DatanodeManager {
313312
}
314313
dnsToSwitchMapping.resolve(locations);
315314
}
316-
317315
heartbeatIntervalSeconds = conf.getTimeDuration(
318316
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
319317
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
@@ -322,7 +320,6 @@ public class DatanodeManager {
322320
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
323321
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
324322
+ 10 * 1000 * heartbeatIntervalSeconds;
325-
326323
// Effected block invalidate limit is the bigger value between
327324
// value configured in hdfs-site.xml, and 20 * HB interval.
328325
final int configuredBlockInvalidateLimit = conf.getInt(
@@ -335,16 +332,17 @@ public class DatanodeManager {
335332
+ ": configured=" + configuredBlockInvalidateLimit
336333
+ ", counted=" + countedBlockInvalidateLimit
337334
+ ", effected=" + blockInvalidateLimit);
338-
339335
this.checkIpHostnameInRegistration = conf.getBoolean(
340336
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
341337
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT);
342338
LOG.info(DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY
343339
+ "=" + checkIpHostnameInRegistration);
344-
345340
this.avoidStaleDataNodesForRead = conf.getBoolean(
346341
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
347342
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
343+
this.avoidSlowDataNodesForRead = conf.getBoolean(
344+
DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
345+
DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT);
348346
this.readConsiderLoad = conf.getBoolean(
349347
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
350348
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
@@ -389,7 +387,7 @@ private void startSlowPeerCollector() {
389387
public void run() {
390388
while (true) {
391389
try {
392-
slowNodesSet = getSlowPeers();
390+
slowNodesUuidSet = getSlowPeersUuidSet();
393391
} catch (Exception e) {
394392
LOG.error("Failed to collect slow peers", e);
395393
}
@@ -509,12 +507,16 @@ private boolean isInactive(DatanodeInfo datanode) {
509507
return datanode.isDecommissioned() || datanode.isEnteringMaintenance() ||
510508
(avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
511509
}
510+
511+
private boolean isSlowNode(String dnUuid) {
512+
return avoidSlowDataNodesForRead && slowNodesUuidSet.contains(dnUuid);
513+
}
512514

513515
/**
514516
* Sort the non-striped located blocks by the distance to the target host.
515517
*
516-
* For striped blocks, it will only move decommissioned/stale nodes to the
517-
* bottom. For example, assume we have storage list:
518+
* For striped blocks, it will only move decommissioned/stale/slow
519+
* nodes to the bottom. For example, assume we have storage list:
518520
* d0, d1, d2, d3, d4, d5, d6, d7, d8, d9
519521
* mapping to block indices:
520522
* 0, 1, 2, 3, 4, 5, 6, 7, 8, 2
@@ -526,8 +528,11 @@ private boolean isInactive(DatanodeInfo datanode) {
526528
*/
527529
public void sortLocatedBlocks(final String targetHost,
528530
final List<LocatedBlock> locatedBlocks) {
529-
Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
530-
new DFSUtil.ServiceAndStaleComparator(staleInterval) :
531+
Comparator<DatanodeInfo> comparator =
532+
avoidStaleDataNodesForRead || avoidSlowDataNodesForRead ?
533+
new DFSUtil.StaleAndSlowComparator(
534+
avoidStaleDataNodesForRead, staleInterval,
535+
avoidSlowDataNodesForRead, slowNodesUuidSet) :
531536
new DFSUtil.ServiceComparator();
532537
// sort located block
533538
for (LocatedBlock lb : locatedBlocks) {
@@ -540,7 +545,8 @@ public void sortLocatedBlocks(final String targetHost,
540545
}
541546

542547
/**
543-
* Move decommissioned/stale datanodes to the bottom. After sorting it will
548+
* Move decommissioned/entering_maintenance/stale/slow
549+
* datanodes to the bottom. After sorting it will
544550
* update block indices and block tokens respectively.
545551
*
546552
* @param lb located striped block
@@ -571,8 +577,9 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
571577
}
572578

573579
/**
574-
* Move decommissioned/entering_maintenance/stale datanodes to the bottom.
575-
* Also, sort nodes by network distance.
580+
* Move decommissioned/entering_maintenance/stale/slow
581+
* datanodes to the bottom. Also, sort nodes by network
582+
* distance.
576583
*
577584
* @param lb located block
578585
* @param targetHost target host
@@ -602,12 +609,15 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
602609
}
603610

604611
DatanodeInfoWithStorage[] di = lb.getLocations();
605-
// Move decommissioned/entering_maintenance/stale datanodes to the bottom
612+
// Move decommissioned/entering_maintenance/stale/slow
613+
// datanodes to the bottom
606614
Arrays.sort(di, comparator);
607615

608616
// Sort nodes by network distance only for located blocks
609617
int lastActiveIndex = di.length - 1;
610-
while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) {
618+
while (lastActiveIndex > 0 && (
619+
isSlowNode(di[lastActiveIndex].getDatanodeUuid()) ||
620+
isInactive(di[lastActiveIndex]))) {
611621
--lastActiveIndex;
612622
}
613623
int activeLen = lastActiveIndex + 1;
@@ -2085,10 +2095,10 @@ public String getSlowPeersReport() {
20852095
* Returns all tracking slow peers.
20862096
* @return
20872097
*/
2088-
public Set<Node> getSlowPeers() {
2089-
Set<Node> slowPeersSet = Sets.newConcurrentHashSet();
2098+
public Set<String> getSlowPeersUuidSet() {
2099+
Set<String> slowPeersUuidSet = Sets.newConcurrentHashSet();
20902100
if (slowPeerTracker == null) {
2091-
return slowPeersSet;
2101+
return slowPeersUuidSet;
20922102
}
20932103
ArrayList<String> slowNodes =
20942104
slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
@@ -2101,18 +2111,18 @@ public Set<Node> getSlowPeers() {
21012111
DatanodeDescriptor datanodeByHost =
21022112
host2DatanodeMap.getDatanodeByHost(ipAddr);
21032113
if (datanodeByHost != null) {
2104-
slowPeersSet.add(datanodeByHost);
2114+
slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid());
21052115
}
21062116
}
2107-
return slowPeersSet;
2117+
return slowPeersUuidSet;
21082118
}
21092119

21102120
/**
2111-
* Returns all tracking slow peers.
2121+
* Returns all tracking slow datanodes uuids.
21122122
* @return
21132123
*/
2114-
public static Set<Node> getSlowNodes() {
2115-
return slowNodesSet;
2124+
public static Set<String> getSlowNodesUuidSet() {
2125+
return slowNodesUuidSet;
21162126
}
21172127

21182128
/**
@@ -2130,6 +2140,12 @@ public SlowPeerTracker getSlowPeerTracker() {
21302140
public SlowDiskTracker getSlowDiskTracker() {
21312141
return slowDiskTracker;
21322142
}
2143+
2144+
@VisibleForTesting
2145+
public void addSlowPeers(String dnUuid) {
2146+
slowNodesUuidSet.add(dnUuid);
2147+
}
2148+
21332149
/**
21342150
* Retrieve information about slow disks as a JSON.
21352151
* Returns null if we are not tracking slow disks.

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2110,6 +2110,16 @@
21102110
</description>
21112111
</property>
21122112

2113+
<property>
2114+
<name>dfs.namenode.avoid.read.slow.datanode</name>
2115+
<value>false</value>
2116+
<description>
2117+
Indicate whether or not to avoid reading from &quot;slow&quot; datanodes.
2118+
Slow datanodes will be moved to the end of the node list returned
2119+
for reading.
2120+
</description>
2121+
</property>
2122+
21132123
<property>
21142124
<name>dfs.namenode.avoid.write.stale.datanode</name>
21152125
<value>false</value>

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.hadoop.hdfs.DFSTestUtil;
2323
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
2424
import org.apache.hadoop.hdfs.server.namenode.NameNode;
25-
import org.apache.hadoop.net.Node;
2625
import org.junit.Test;
2726
import org.junit.runner.RunWith;
2827
import org.junit.runners.Parameterized;
@@ -100,12 +99,12 @@ public void testChooseTargetExcludeSlowNodes() throws Exception {
10099
Thread.sleep(3000);
101100

102101
// fetch slow nodes
103-
Set<Node> slowPeers = dnManager.getSlowPeers();
102+
Set<String> slowPeers = dnManager.getSlowPeersUuidSet();
104103

105104
// assert slow nodes
106105
assertEquals(3, slowPeers.size());
107106
for (int i = 0; i < slowPeers.size(); i++) {
108-
assertTrue(slowPeers.contains(dataNodes[i]));
107+
assertTrue(slowPeers.contains(dataNodes[i].getDatanodeUuid()));
109108
}
110109

111110
// mock writer
@@ -120,7 +119,8 @@ public void testChooseTargetExcludeSlowNodes() throws Exception {
120119
// assert targets
121120
assertEquals(3, targets.length);
122121
for (int i = 0; i < targets.length; i++) {
123-
assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()));
122+
assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()
123+
.getDatanodeUuid()));
124124
}
125125
} finally {
126126
namenode.getNamesystem().writeUnlock();

0 commit comments

Comments
 (0)