Skip to content

Commit 4ef0b90

Browse files
authored
Merge branch 'apache:trunk' into YARN-11326
2 parents c21a998 + 6ea10cf commit 4ef0b90

90 files changed

Lines changed: 2038 additions & 365 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3942,6 +3942,7 @@ public static class StatisticsData {
39423942
private volatile long bytesReadDistanceOfThreeOrFour;
39433943
private volatile long bytesReadDistanceOfFiveOrLarger;
39443944
private volatile long bytesReadErasureCoded;
3945+
private volatile long remoteReadTimeMS;
39453946

39463947
/**
39473948
* Add another StatisticsData object to this one.
@@ -3959,6 +3960,7 @@ void add(StatisticsData other) {
39593960
this.bytesReadDistanceOfFiveOrLarger +=
39603961
other.bytesReadDistanceOfFiveOrLarger;
39613962
this.bytesReadErasureCoded += other.bytesReadErasureCoded;
3963+
this.remoteReadTimeMS += other.remoteReadTimeMS;
39623964
}
39633965

39643966
/**
@@ -3977,6 +3979,7 @@ void negate() {
39773979
this.bytesReadDistanceOfFiveOrLarger =
39783980
-this.bytesReadDistanceOfFiveOrLarger;
39793981
this.bytesReadErasureCoded = -this.bytesReadErasureCoded;
3982+
this.remoteReadTimeMS = -this.remoteReadTimeMS;
39803983
}
39813984

39823985
@Override
@@ -4025,6 +4028,10 @@ public long getBytesReadDistanceOfFiveOrLarger() {
40254028
public long getBytesReadErasureCoded() {
40264029
return bytesReadErasureCoded;
40274030
}
4031+
4032+
public long getRemoteReadTimeMS() {
4033+
return remoteReadTimeMS;
4034+
}
40284035
}
40294036

40304037
private interface StatisticsAggregator<T> {
@@ -4252,6 +4259,14 @@ public void incrementBytesReadByDistance(int distance, long newBytes) {
42524259
}
42534260
}
42544261

4262+
/**
4263+
* Increment the time taken to read bytes from remote in the statistics.
4264+
* @param durationMS time taken in ms to read bytes from remote
4265+
*/
4266+
public void increaseRemoteReadTime(final long durationMS) {
4267+
getThreadStatistics().remoteReadTimeMS += durationMS;
4268+
}
4269+
42554270
/**
42564271
* Apply the given aggregator to all StatisticsData objects associated with
42574272
* this Statistics object.
@@ -4399,6 +4414,25 @@ public long getBytesReadByDistance(int distance) {
43994414
return bytesRead;
44004415
}
44014416

4417+
/**
4418+
* Get total time taken in ms for bytes read from remote.
4419+
* @return time taken in ms for remote bytes read.
4420+
*/
4421+
public long getRemoteReadTime() {
4422+
return visitAll(new StatisticsAggregator<Long>() {
4423+
private long remoteReadTimeMS = 0;
4424+
4425+
@Override
4426+
public void accept(StatisticsData data) {
4427+
remoteReadTimeMS += data.remoteReadTimeMS;
4428+
}
4429+
4430+
public Long aggregate() {
4431+
return remoteReadTimeMS;
4432+
}
4433+
});
4434+
}
4435+
44024436
/**
44034437
* Get all statistics data.
44044438
* MR or other frameworks can use the method to get all statistics at once.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public class FileSystemStorageStatistics extends StorageStatistics {
4747
"bytesReadDistanceOfOneOrTwo",
4848
"bytesReadDistanceOfThreeOrFour",
4949
"bytesReadDistanceOfFiveOrLarger",
50-
"bytesReadErasureCoded"
50+
"bytesReadErasureCoded",
51+
"remoteReadTimeMS"
5152
};
5253

5354
private static class LongStatisticIterator
@@ -107,6 +108,8 @@ private static Long fetch(StatisticsData data, String key) {
107108
return data.getBytesReadDistanceOfFiveOrLarger();
108109
case "bytesReadErasureCoded":
109110
return data.getBytesReadErasureCoded();
111+
case "remoteReadTimeMS":
112+
return data.getRemoteReadTimeMS();
110113
default:
111114
return null;
112115
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public class TestFileSystemStorageStatistics {
5252
"bytesReadDistanceOfOneOrTwo",
5353
"bytesReadDistanceOfThreeOrFour",
5454
"bytesReadDistanceOfFiveOrLarger",
55-
"bytesReadErasureCoded"
55+
"bytesReadErasureCoded",
56+
"remoteReadTimeMS"
5657
};
5758

5859
private FileSystem.Statistics statistics =
@@ -74,6 +75,7 @@ public void setup() {
7475
statistics.incrementBytesReadByDistance(1, RandomUtils.nextInt(0, 100));
7576
statistics.incrementBytesReadByDistance(3, RandomUtils.nextInt(0, 100));
7677
statistics.incrementBytesReadErasureCoded(RandomUtils.nextInt(0, 100));
78+
statistics.increaseRemoteReadTime(RandomUtils.nextInt(0, 100));
7779
}
7880

7981
@Test
@@ -128,6 +130,8 @@ private long getStatisticsValue(String name) {
128130
return statistics.getBytesReadByDistance(5);
129131
case "bytesReadErasureCoded":
130132
return statistics.getBytesReadErasureCoded();
133+
case "remoteReadTimeMS":
134+
return statistics.getRemoteReadTime();
131135
default:
132136
return 0;
133137
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3090,10 +3090,14 @@ public Peer newConnectedPeer(InetSocketAddress addr,
30903090
}
30913091
}
30923092

3093-
void updateFileSystemReadStats(int distance, int nRead) {
3093+
void updateFileSystemReadStats(int distance, int readBytes, long readTimeMS) {
30943094
if (stats != null) {
3095-
stats.incrementBytesRead(nRead);
3096-
stats.incrementBytesReadByDistance(distance, nRead);
3095+
stats.incrementBytesRead(readBytes);
3096+
stats.incrementBytesReadByDistance(distance, readBytes);
3097+
if (distance > 0) {
3098+
//remote read
3099+
stats.increaseRemoteReadTime(readTimeMS);
3100+
}
30973101
}
30983102
}
30993103

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -851,8 +851,9 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
851851
locatedBlocks.getFileLength() - pos);
852852
}
853853
}
854+
long beginReadMS = Time.monotonicNow();
854855
int result = readBuffer(strategy, realLen, corruptedBlocks);
855-
856+
long readTimeMS = Time.monotonicNow() - beginReadMS;
856857
if (result >= 0) {
857858
pos += result;
858859
} else {
@@ -861,7 +862,7 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
861862
}
862863
updateReadStatistics(readStatistics, result, blockReader);
863864
dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
864-
result);
865+
result, readTimeMS);
865866
if (readStatistics.getBlockType() == BlockType.STRIPED) {
866867
dfsClient.updateFileSystemECReadStats(result);
867868
}
@@ -1184,6 +1185,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
11841185
ByteBuffer tmp = buf.duplicate();
11851186
tmp.limit(tmp.position() + len);
11861187
tmp = tmp.slice();
1188+
long beginReadMS = Time.monotonicNow();
11871189
int nread = 0;
11881190
int ret;
11891191
while (true) {
@@ -1193,11 +1195,12 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
11931195
}
11941196
nread += ret;
11951197
}
1198+
long readTimeMS = Time.monotonicNow() - beginReadMS;
11961199
buf.position(buf.position() + nread);
11971200

11981201
IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
11991202
dfsClient.updateFileSystemReadStats(
1200-
reader.getNetworkDistance(), nread);
1203+
reader.getNetworkDistance(), nread, readTimeMS);
12011204
if (readStatistics.getBlockType() == BlockType.STRIPED) {
12021205
dfsClient.updateFileSystemECReadStats(nread);
12031206
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,15 +331,17 @@ private void readOneStripe(CorruptedBlocks corruptedBlocks)
331331
* its ThreadLocal.
332332
*
333333
* @param stats striped read stats
334+
* @param readTimeMS read time metrics in ms
335+
*
334336
*/
335-
void updateReadStats(final StripedBlockUtil.BlockReadStats stats) {
337+
void updateReadStats(final StripedBlockUtil.BlockReadStats stats, long readTimeMS) {
336338
if (stats == null) {
337339
return;
338340
}
339341
updateReadStatistics(readStatistics, stats.getBytesRead(),
340342
stats.isShortCircuit(), stats.getNetworkDistance());
341343
dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(),
342-
stats.getBytesRead());
344+
stats.getBytesRead(), readTimeMS);
343345
assert readStatistics.getBlockType() == BlockType.STRIPED;
344346
dfsClient.updateFileSystemECReadStats(stats.getBytesRead());
345347
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,12 @@ void readStripe() throws IOException {
351351
// first read failure
352352
while (!futures.isEmpty()) {
353353
try {
354+
long beginReadMS = Time.monotonicNow();
354355
StripingChunkReadResult r = StripedBlockUtil
355356
.getNextCompletedStripedRead(service, futures, 0);
356-
dfsStripedInputStream.updateReadStats(r.getReadStats());
357+
long readTimeMS = Time.monotonicNow() - beginReadMS;
358+
359+
dfsStripedInputStream.updateReadStats(r.getReadStats(), readTimeMS);
357360
DFSClient.LOG.debug("Read task returned: {}, for stripe {}",
358361
r, alignedStripe);
359362
StripingChunk returnedChunk = alignedStripe.chunks[r.index];

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
271271
"dfs.namenode.redundancy.considerLoad.factor";
272272
public static final double
273273
DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT = 2.0;
274+
public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY =
275+
"dfs.namenode.redundancy.considerLoadByVolume";
276+
public static final boolean
277+
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_DEFAULT
278+
= false;
274279
public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY =
275280
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
276281
public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ private enum NodeNotChosenReason {
8282
NOT_IN_SERVICE("the node is not in service"),
8383
NODE_STALE("the node is stale"),
8484
NODE_TOO_BUSY("the node is too busy"),
85+
NODE_TOO_BUSY_BY_VOLUME("the node is too busy based on volume load"),
8586
TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"),
8687
NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"),
8788
NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"),
@@ -101,6 +102,7 @@ private String getText() {
101102
protected boolean considerLoad;
102103
private boolean considerLoadByStorageType;
103104
protected double considerLoadFactor;
105+
private boolean considerLoadByVolume = false;
104106
private boolean preferLocalNode;
105107
private boolean dataNodePeerStatsEnabled;
106108
private volatile boolean excludeSlowNodesEnabled;
@@ -131,6 +133,10 @@ public void initialize(Configuration conf, FSClusterStats stats,
131133
this.considerLoadFactor = conf.getDouble(
132134
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR,
133135
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT);
136+
this.considerLoadByVolume = conf.getBoolean(
137+
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY,
138+
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_DEFAULT
139+
);
134140
this.stats = stats;
135141
this.clusterMap = clusterMap;
136142
this.host2datanodeMap = host2datanodeMap;
@@ -1007,6 +1013,16 @@ boolean excludeNodeByLoad(DatanodeDescriptor node){
10071013
"(load: " + nodeLoad + " > " + maxLoad + ")");
10081014
return true;
10091015
}
1016+
if (considerLoadByVolume) {
1017+
final int numVolumesAvailable = node.getNumVolumesAvailable();
1018+
final double maxLoadForVolumes = considerLoadFactor * numVolumesAvailable *
1019+
stats.getInServiceXceiverAverageForVolume();
1020+
if (maxLoadForVolumes > 0.0 && nodeLoad > maxLoadForVolumes) {
1021+
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY_BY_VOLUME,
1022+
"(load: " + nodeLoad + " > " + maxLoadForVolumes + ") ");
1023+
return true;
1024+
}
1025+
}
10101026
return false;
10111027
}
10121028

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,9 @@ public Type getType() {
233233
// HB processing can use it to tell if it is the first HB since DN restarted
234234
private boolean heartbeatedSinceRegistration = false;
235235

236+
/** The number of volumes that can be written.*/
237+
private int numVolumesAvailable = 0;
238+
236239
/**
237240
* DatanodeDescriptor constructor
238241
* @param nodeID id of the data node
@@ -411,6 +414,7 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
411414
long totalNonDfsUsed = 0;
412415
Set<String> visitedMount = new HashSet<>();
413416
Set<DatanodeStorageInfo> failedStorageInfos = null;
417+
int volumesAvailable = 0;
414418

415419
// Decide if we should check for any missing StorageReport and mark it as
416420
// failed. There are different scenarios.
@@ -489,7 +493,11 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
489493
visitedMount.add(mount);
490494
}
491495
}
496+
if (report.getRemaining() > 0 && storage.getState() != State.FAILED) {
497+
volumesAvailable += 1;
498+
}
492499
}
500+
this.numVolumesAvailable = volumesAvailable;
493501

494502
// Update total metrics for the node.
495503
setCapacity(totalCapacity);
@@ -981,6 +989,14 @@ public VolumeFailureSummary getVolumeFailureSummary() {
981989
return volumeFailureSummary;
982990
}
983991

992+
/**
993+
* Return the number of volumes that can be written.
994+
* @return the number of volumes that can be written.
995+
*/
996+
public int getNumVolumesAvailable() {
997+
return numVolumesAvailable;
998+
}
999+
9841000
/**
9851001
* @param nodeReg DatanodeID to update registration for.
9861002
*/

0 commit comments

Comments
 (0)