Skip to content

Commit 8528d57

Browse files
HDFS-17216. Distcp: When handle the small files, the bandwidth parameter will be invalid, fix this bug. (#6138)
1 parent 5bfca65 commit 8528d57

File tree

3 files changed

+81
-9
lines changed

3 files changed

+81
-9
lines changed

hadoop-tools/hadoop-distcp/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@
114114
<artifactId>assertj-core</artifactId>
115115
<scope>test</scope>
116116
</dependency>
117+
<dependency>
118+
<groupId>org.hamcrest</groupId>
119+
<artifactId>hamcrest-library</artifactId>
120+
<version>1.3</version>
121+
<scope>test</scope>
122+
</dependency>
117123
</dependencies>
118124

119125
<build>

hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,11 @@ public long getTotalBytesRead() {
120120
* @return Read rate, in bytes/sec.
121121
*/
122122
public long getBytesPerSec() {
123-
long elapsed = (System.currentTimeMillis() - startTime) / 1000;
124-
if (elapsed == 0) {
125-
return bytesRead;
126-
} else {
127-
return bytesRead / elapsed;
123+
if (bytesRead == 0){
124+
return 0;
128125
}
126+
float elapsed = (System.currentTimeMillis() - startTime) / 1000.0f;
127+
return (long) (bytesRead / elapsed);
129128
}
130129

131130
/**

hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.slf4j.LoggerFactory;
2323
import org.apache.hadoop.io.IOUtils;
2424
import org.junit.Assert;
25+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
26+
import static org.junit.Assert.assertThat;
2527
import org.junit.Test;
2628

2729
import java.io.*;
@@ -43,7 +45,9 @@ public void testRead() {
4345
tmpFile.deleteOnExit();
4446
outFile.deleteOnExit();
4547

46-
long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER);
48+
// Correction: we should use CB.ONE_C mode to calculate the maxBandwidth,
49+
// because CB.ONE_C's speed is the lowest.
50+
long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.ONE_C);
4751

4852
copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER);
4953
/*
@@ -90,10 +94,16 @@ private long copyAndAssert(File tmpFile, File outFile,
9094
}
9195

9296
LOG.info("{}", in);
97+
/*
98+
in.getBytesPerSec() should not be called repeatedly,
99+
because each call will return a different value,
100+
and because the program execution also takes time,
101+
which magnifies the error of getBytesPerSec()
102+
*/
93103
bandwidth = in.getBytesPerSec();
94104
Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
95-
Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2));
96-
Assert.assertTrue(in.getTotalSleepTime() > sleepTime || in.getBytesPerSec() <= maxBPS);
105+
Assert.assertTrue(bandwidth > maxBandwidth / (factor * 1.2));
106+
Assert.assertTrue(in.getTotalSleepTime() > sleepTime || bandwidth <= maxBPS);
97107
} finally {
98108
IOUtils.closeStream(in);
99109
IOUtils.closeStream(out);
@@ -154,4 +164,61 @@ private void writeToFile(File tmpFile, long sizeInKB) throws IOException {
154164
IOUtils.closeStream(out);
155165
}
156166
}
157-
}
167+
168+
/**
169+
* Distcp: When handle the small files,
170+
* the bandwidth parameter will be invalid, fix this bug
171+
*/
172+
@Test
173+
public void testFixThrottleInvalid() {
174+
int testFileCnt = 100;
175+
int fileSize = 19;
176+
int bandwidth= 20;
177+
File[] srcFiles = new File[testFileCnt];
178+
File destFile;
179+
try {
180+
destFile = createFile(testFileCnt * 100 * 1024);
181+
destFile.deleteOnExit();
182+
183+
// create srcFile
184+
for (int i = 0; i < srcFiles.length; i++) {
185+
srcFiles[i] = createFile(fileSize * 1024);
186+
srcFiles[i].deleteOnExit();
187+
}
188+
189+
long begin = System.currentTimeMillis();
190+
LOG.info("begin: " + begin);
191+
192+
// copy srcFiles
193+
for (File srcFile : srcFiles) {
194+
LOG.info("fileLength: " + srcFiles.length);
195+
copyAndAssert(srcFile, destFile, bandwidth * 1024 * 1024);
196+
}
197+
198+
// Check whether the speed limit is successfully limited
199+
long end = System.currentTimeMillis();
200+
LOG.info("end: " + end);
201+
assertThat((int) (end - begin) / 1000,
202+
greaterThanOrEqualTo(testFileCnt * fileSize / bandwidth));
203+
} catch (IOException e) {
204+
LOG.error("Exception encountered ", e);
205+
}
206+
}
207+
208+
private void copyAndAssert(File tmpFile, File outFile, long maxBPS)
209+
throws IOException {
210+
ThrottledInputStream in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
211+
OutputStream out = new FileOutputStream(outFile);
212+
try {
213+
copyBytes(in, out, BUFF_SIZE);
214+
LOG.info("{}", in);
215+
Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
216+
217+
long bytesPerSec = in.getBytesPerSec();
218+
Assert.assertTrue(bytesPerSec < maxBPS);
219+
} finally {
220+
IOUtils.closeStream(in);
221+
IOUtils.closeStream(out);
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)