Skip to content

Commit 63c18fc

Browse files
HDFS-17216. Distcp: When handle the small files, the bandwidth parameter will be invalid, fix this bug.
1 parent b8815fe commit 63c18fc

File tree

3 files changed

+63
-5
lines changed

3 files changed

+63
-5
lines changed

hadoop-tools/hadoop-distcp/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@
114114
<artifactId>assertj-core</artifactId>
115115
<scope>test</scope>
116116
</dependency>
117+
<dependency>
118+
<groupId>org.hamcrest</groupId>
119+
<artifactId>hamcrest-library</artifactId>
120+
<version>1.3</version>
121+
<scope>test</scope>
122+
</dependency>
117123
</dependencies>
118124

119125
<build>

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,11 @@ public long getTotalBytesRead() {
120120
* @return Read rate, in bytes/sec.
121121
*/
122122
public long getBytesPerSec() {
123-
long elapsed = (System.currentTimeMillis() - startTime) / 1000;
124-
if (elapsed == 0) {
125-
return bytesRead;
126-
} else {
127-
return bytesRead / elapsed;
123+
if (bytesRead == 0){
124+
return 0;
128125
}
126+
float elapsed = (System.currentTimeMillis() - startTime) / 1000.0f;
127+
return (long) (bytesRead / elapsed);
129128
}
130129

131130
/**

hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.slf4j.LoggerFactory;
2323
import org.apache.hadoop.io.IOUtils;
2424
import org.junit.Assert;
25+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
26+
import static org.junit.Assert.assertThat;
2527
import org.junit.Test;
2628

2729
import java.io.*;
@@ -67,6 +69,40 @@ public void testRead() {
6769
}
6870
}
6971

72+
@Test
73+
public void testThrottleRead() {
74+
int testFileCnt = 100;
75+
int fileSize = 19;
76+
int bandwidth= 20;
77+
File[] srcFiles = new File[testFileCnt];
78+
File destFile;
79+
try {
80+
destFile = createFile(testFileCnt * 100 * 1024);
81+
destFile.deleteOnExit();
82+
// create file
83+
for (int i = 0; i < srcFiles.length; i++) {
84+
srcFiles[i] = createFile(fileSize * 1024);
85+
srcFiles[i].deleteOnExit();
86+
}
87+
88+
// copy srcFiles
89+
long begin = System.currentTimeMillis();
90+
LOG.info("begin: " + begin);
91+
92+
for (File srcFile : srcFiles) {
93+
LOG.info("fileLength: " + srcFiles.length);
94+
copyAndAssertWhenFloatCalc(srcFile, destFile, bandwidth * 1024 * 1024);
95+
}
96+
97+
long end = System.currentTimeMillis();
98+
LOG.info("end: " + end);
99+
assertThat((int) (end - begin) / 1000,
100+
greaterThanOrEqualTo(testFileCnt * fileSize / bandwidth));
101+
} catch (IOException e) {
102+
LOG.error("Exception encountered ", e);
103+
}
104+
}
105+
70106
private long copyAndAssert(File tmpFile, File outFile,
71107
long maxBandwidth, float factor,
72108
int sleepTime, CB flag) throws IOException {
@@ -101,6 +137,23 @@ private long copyAndAssert(File tmpFile, File outFile,
101137
return bandwidth;
102138
}
103139

140+
private void copyAndAssertWhenFloatCalc(File tmpFile, File outFile, long maxBPS)
141+
throws IOException {
142+
ThrottledInputStream in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
143+
OutputStream out = new FileOutputStream(outFile);
144+
try {
145+
copyBytes(in, out, BUFF_SIZE);
146+
LOG.info("{}", in);
147+
Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
148+
149+
long bytesPerSec = in.getBytesPerSec();
150+
Assert.assertTrue( bytesPerSec < maxBPS);
151+
} finally {
152+
IOUtils.closeStream(in);
153+
IOUtils.closeStream(out);
154+
}
155+
}
156+
104157
private static void copyBytesWithOffset(InputStream in, OutputStream out, int buffSize)
105158
throws IOException {
106159

0 commit comments

Comments
 (0)