Skip to content

Commit 82522d6

Browse files
HADOOP-17281 Implement FileSystem.listStatusIterator() in S3AFileSystem (#2354)
Contains HADOOP-17300: FileSystem.DirListingIterator.next() call should return NoSuchElementException Contributed by Mukund Thakur
1 parent 16aea11 commit 82522d6

7 files changed

Lines changed: 191 additions & 2 deletions

File tree

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2229,7 +2229,9 @@ private void fetchMore() throws IOException {
22292229
@Override
22302230
@SuppressWarnings("unchecked")
22312231
public T next() throws IOException {
2232-
Preconditions.checkState(hasNext(), "No more items in iterator");
2232+
if (!hasNext()) {
2233+
throw new NoSuchElementException("No more items in iterator");
2234+
}
22332235
if (i == entries.getEntries().length) {
22342236
fetchMore();
22352237
}

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,24 @@ any optimizations.
294294
The atomicity and consistency constraints are as for
295295
`listStatus(Path, PathFilter)`.
296296

297+
### `RemoteIterator<FileStatus> listStatusIterator(Path p)`
298+
299+
Return an iterator enumerating the `FileStatus` entries under
300+
a path. This is similar to `listStatus(Path)` except the fact that
301+
rather than returning an entire list, an iterator is returned.
302+
The result is exactly the same as `listStatus(Path)`, provided no other
303+
caller updates the directory during the listing. Having said that, this does
304+
not guarantee atomicity if other callers are adding/deleting the files
305+
inside the directory while listing is being performed. Different filesystems
306+
may provide a more efficient implementation, for example S3A does the
307+
listing in pages and fetches the next pages asynchronously while a
308+
page is getting processed.
309+
310+
Note that now since the initial listing is async, bucket/path existence
311+
exception may show up later during next() call.
312+
313+
Callers should prefer using listStatusIterator over listStatus as it
314+
is incremental in nature.
297315

298316
### `FileStatus[] listStatus(Path[] paths)`
299317

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.List;
2525
import java.util.UUID;
2626

27+
import org.assertj.core.api.Assertions;
28+
2729
import org.apache.hadoop.fs.FileStatus;
2830
import org.apache.hadoop.fs.FileSystem;
2931
import org.apache.hadoop.fs.FilterFileSystem;
@@ -148,6 +150,7 @@ public void testListLocatedStatusEmptyDirectory() throws IOException {
148150
public void testComplexDirActions() throws Throwable {
149151
TreeScanResults tree = createTestTree();
150152
checkListStatusStatusComplexDir(tree);
153+
checkListStatusIteratorComplexDir(tree);
151154
checkListLocatedStatusStatusComplexDir(tree);
152155
checkListFilesComplexDirNonRecursive(tree);
153156
checkListFilesComplexDirRecursive(tree);
@@ -169,6 +172,34 @@ protected void checkListStatusStatusComplexDir(TreeScanResults tree)
169172
listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0);
170173
}
171174

175+
/**
176+
* Test {@link FileSystem#listStatusIterator(Path)} on a complex
177+
* directory tree.
178+
* @param tree directory tree to list.
179+
* @throws Throwable
180+
*/
181+
protected void checkListStatusIteratorComplexDir(TreeScanResults tree)
182+
throws Throwable {
183+
describe("Expect listStatusIterator to list all entries in top dir only");
184+
185+
FileSystem fs = getFileSystem();
186+
TreeScanResults listing = new TreeScanResults(
187+
fs.listStatusIterator(tree.getBasePath()));
188+
listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0);
189+
190+
List<FileStatus> resWithoutCheckingHasNext =
191+
iteratorToListThroughNextCallsAlone(fs
192+
.listStatusIterator(tree.getBasePath()));
193+
194+
List<FileStatus> resWithCheckingHasNext = iteratorToList(fs
195+
.listStatusIterator(tree.getBasePath()));
196+
Assertions.assertThat(resWithCheckingHasNext)
197+
.describedAs("listStatusIterator() should return correct " +
198+
"results even if hasNext() calls are not made.")
199+
.hasSameElementsAs(resWithoutCheckingHasNext);
200+
201+
}
202+
172203
/**
173204
* Test {@link FileSystem#listLocatedStatus(Path)} on a complex
174205
* directory tree.
@@ -322,6 +353,45 @@ public void testListStatusFile() throws Throwable {
322353
verifyStatusArrayMatchesFile(f, getFileSystem().listStatus(f));
323354
}
324355

356+
@Test
357+
public void testListStatusIteratorFile() throws Throwable {
358+
describe("test the listStatusIterator(path) on a file");
359+
Path f = touchf("listStItrFile");
360+
361+
List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
362+
getFileSystem().listStatusIterator(f));
363+
validateListingForFile(f, statusList, false);
364+
365+
List<FileStatus> statusList2 =
366+
(List<FileStatus>) iteratorToListThroughNextCallsAlone(
367+
getFileSystem().listStatusIterator(f));
368+
validateListingForFile(f, statusList2, true);
369+
}
370+
371+
/**
372+
* Validate listing result for an input path which is file.
373+
* @param f file.
374+
* @param statusList list status of a file.
375+
* @param nextCallAlone whether the listing generated just using
376+
* next() calls.
377+
*/
378+
private void validateListingForFile(Path f,
379+
List<FileStatus> statusList,
380+
boolean nextCallAlone) {
381+
String msg = String.format("size of file list returned using %s should " +
382+
"be 1", nextCallAlone ?
383+
"next() calls alone" : "hasNext() and next() calls");
384+
Assertions.assertThat(statusList)
385+
.describedAs(msg)
386+
.hasSize(1);
387+
Assertions.assertThat(statusList.get(0).getPath())
388+
.describedAs("path returned should match with the input path")
389+
.isEqualTo(f);
390+
Assertions.assertThat(statusList.get(0).isFile())
391+
.describedAs("path returned should be a file")
392+
.isEqualTo(true);
393+
}
394+
325395
@Test
326396
public void testListFilesFile() throws Throwable {
327397
describe("test the listStatus(path) on a file");

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import org.apache.hadoop.fs.LocatedFileStatus;
2323
import org.apache.hadoop.fs.Path;
2424
import org.junit.Test;
25+
import org.assertj.core.api.Assertions;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728

2829
import java.io.IOException;
30+
import java.util.Arrays;
2931
import java.util.List;
3032
import java.util.concurrent.Callable;
3133
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.stream.Collectors;
3235

3336
import org.apache.hadoop.fs.FileStatus;
3437
import org.apache.hadoop.fs.RemoteIterator;
@@ -39,6 +42,7 @@
3942
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
4043
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
4144
import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
45+
import static org.apache.hadoop.fs.contract.ContractTestUtils.iteratorToList;
4246
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
4347
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
4448
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@@ -242,6 +246,13 @@ public void testSimpleRootListing() throws IOException {
242246
+ "listStatus = " + listStatusResult
243247
+ "listFiles = " + listFilesResult,
244248
fileList.size() <= statuses.length);
249+
List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
250+
fs.listStatusIterator(root));
251+
Assertions.assertThat(statusList)
252+
.describedAs("Result of listStatus(/) and listStatusIterator(/)"
253+
+ " must match")
254+
.hasSameElementsAs(Arrays.stream(statuses)
255+
.collect(Collectors.toList()));
245256
}
246257

247258
@Test

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1453,6 +1453,52 @@ public static List<LocatedFileStatus> toList(
14531453
return list;
14541454
}
14551455

1456+
/**
1457+
* Convert a remote iterator over file status results into a list.
1458+
* The utility equivalents in commons collection and guava cannot be
1459+
* used here, as this is a different interface, one whose operators
1460+
* can throw IOEs.
1461+
* @param iterator input iterator
1462+
* @return the file status entries as a list.
1463+
* @throws IOException
1464+
*/
1465+
public static <T extends FileStatus> List<T> iteratorToList(
1466+
RemoteIterator<T> iterator) throws IOException {
1467+
List<T> list = new ArrayList<>();
1468+
while (iterator.hasNext()) {
1469+
list.add(iterator.next());
1470+
}
1471+
return list;
1472+
}
1473+
1474+
1475+
/**
1476+
* Convert a remote iterator over file status results into a list.
1477+
* This uses {@link RemoteIterator#next()} calls only, expecting
1478+
* a raised {@link NoSuchElementException} exception to indicate that
1479+
* the end of the listing has been reached. This iteration strategy is
1480+
* designed to verify that the implementation of the remote iterator
1481+
* generates results and terminates consistently with the {@code hasNext/next}
1482+
* iteration. More succinctly "verifies that the {@code next()} operator
1483+
* isn't relying on {@code hasNext()} to always be called during an iteration.
1484+
* @param iterator input iterator
1485+
* @return the status entries as a list.
1486+
* @throws IOException IO problems
1487+
*/
1488+
@SuppressWarnings("InfiniteLoopStatement")
1489+
public static <T extends FileStatus> List<T> iteratorToListThroughNextCallsAlone(
1490+
RemoteIterator<T> iterator) throws IOException {
1491+
List<T> list = new ArrayList<>();
1492+
try {
1493+
while (true) {
1494+
list.add(iterator.next());
1495+
}
1496+
} catch (NoSuchElementException expected) {
1497+
// ignored
1498+
}
1499+
return list;
1500+
}
1501+
14561502
/**
14571503
* Convert a remote iterator over file status results into a list.
14581504
* This uses {@link RemoteIterator#next()} calls only, expecting
@@ -1602,7 +1648,7 @@ public TreeScanResults(Path basePath) {
16021648
* @param results results of the listFiles/listStatus call.
16031649
* @throws IOException IO problems during the iteration.
16041650
*/
1605-
public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
1651+
public TreeScanResults(RemoteIterator<? extends FileStatus> results)
16061652
throws IOException {
16071653
while (results.hasNext()) {
16081654
add(results.next());

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2643,6 +2643,30 @@ void maybeCreateFakeParentDirectory(Path path)
26432643
}
26442644
}
26452645

2646+
/**
2647+
* Override subclass such that we benefit for async listing done
2648+
* in {@code S3AFileSystem}. See {@code Listing#ObjectListingIterator}.
2649+
* {@inheritDoc}
2650+
*
2651+
*/
2652+
@Override
2653+
public RemoteIterator<FileStatus> listStatusIterator(Path p)
2654+
throws FileNotFoundException, IOException {
2655+
RemoteIterator<S3AFileStatus> listStatusItr = once("listStatus",
2656+
p.toString(), () -> innerListStatus(p));
2657+
return new RemoteIterator<FileStatus>() {
2658+
@Override
2659+
public boolean hasNext() throws IOException {
2660+
return listStatusItr.hasNext();
2661+
}
2662+
2663+
@Override
2664+
public FileStatus next() throws IOException {
2665+
return listStatusItr.next();
2666+
}
2667+
};
2668+
}
2669+
26462670
/**
26472671
* List the statuses of the files/directories in the given path if the path is
26482672
* a directory.

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,24 @@ public int read() throws IOException {
231231
"match with original list of files")
232232
.hasSameElementsAs(originalListOfFiles)
233233
.hasSize(numOfPutRequests);
234+
// Validate listing using listStatusIterator().
235+
NanoTimer timeUsingListStatusItr = new NanoTimer();
236+
RemoteIterator<FileStatus> lsItr = fs.listStatusIterator(dir);
237+
List<String> listUsingListStatusItr = new ArrayList<>();
238+
while (lsItr.hasNext()) {
239+
listUsingListStatusItr.add(lsItr.next().getPath().toString());
240+
Thread.sleep(eachFileProcessingTime);
241+
}
242+
timeUsingListStatusItr.end("listing %d files using " +
243+
"listStatusIterator() api with batch size of %d " +
244+
"including %dms of processing time for each file",
245+
numOfPutRequests, batchSize, eachFileProcessingTime);
246+
Assertions.assertThat(listUsingListStatusItr)
247+
.describedAs("Listing results using listStatusIterator() must" +
248+
"match with original list of files")
249+
.hasSameElementsAs(originalListOfFiles)
250+
.hasSize(numOfPutRequests);
251+
234252
} finally {
235253
executorService.shutdown();
236254
}

0 commit comments

Comments
 (0)