Skip to content

Commit 5aad0e0

Browse files
sunchaoRogPodge
authored andcommitted
HDFS-13616. Batch listing of multiple directories (apache#1725)
1 parent e06012a commit 5aad0e0

25 files changed

Lines changed: 1249 additions & 5 deletions

File tree

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2207,6 +2207,33 @@ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
22072207
return new DirListingIterator<>(p);
22082208
}
22092209

2210+
/**
2211+
* Batched listing API that returns {@link PartialListing}s for the
2212+
* passed Paths.
2213+
*
2214+
* @param paths List of paths to list.
2215+
* @return RemoteIterator that returns corresponding PartialListings.
2216+
* @throws IOException
2217+
*/
2218+
public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
2219+
final List<Path> paths) throws IOException {
2220+
throw new UnsupportedOperationException("Not implemented");
2221+
}
2222+
2223+
/**
2224+
* Batched listing API that returns {@link PartialListing}s for the passed
2225+
* Paths. The PartialListing will contain {@link LocatedFileStatus} entries
2226+
* with locations.
2227+
*
2228+
* @param paths List of paths to list.
2229+
* @return RemoteIterator that returns corresponding PartialListings.
2230+
* @throws IOException
2231+
*/
2232+
public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
2233+
final List<Path> paths) throws IOException {
2234+
throw new UnsupportedOperationException("Not implemented");
2235+
}
2236+
22102237
/**
22112238
* List the statuses and block locations of the files in the given path.
22122239
* Does not guarantee to return the iterator that traverses statuses
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs;
19+
20+
import com.google.common.base.Preconditions;
21+
import org.apache.commons.lang3.builder.ToStringBuilder;
22+
import org.apache.hadoop.classification.InterfaceAudience;
23+
import org.apache.hadoop.classification.InterfaceStability;
24+
import org.apache.hadoop.ipc.RemoteException;
25+
26+
import java.io.IOException;
27+
import java.util.List;
28+
29+
/**
30+
* A partial listing of the children of a parent directory. Since it is a
31+
* partial listing, multiple PartialListing may need to be combined to obtain
32+
* the full listing of a parent directory.
33+
* <p/>
34+
* ListingBatch behaves similar to a Future, in that getting the result via
35+
* {@link #get()} will throw an Exception if there was a failure.
36+
*/
37+
@InterfaceAudience.Public
38+
@InterfaceStability.Stable
39+
public class PartialListing<T extends FileStatus> {
40+
private final Path listedPath;
41+
private final List<T> partialListing;
42+
private final RemoteException exception;
43+
44+
public PartialListing(Path listedPath, List<T> partialListing) {
45+
this(listedPath, partialListing, null);
46+
}
47+
48+
public PartialListing(Path listedPath, RemoteException exception) {
49+
this(listedPath, null, exception);
50+
}
51+
52+
private PartialListing(Path listedPath, List<T> partialListing,
53+
RemoteException exception) {
54+
Preconditions.checkArgument(partialListing == null ^ exception == null);
55+
this.partialListing = partialListing;
56+
this.listedPath = listedPath;
57+
this.exception = exception;
58+
}
59+
60+
/**
61+
* Partial listing of the path being listed. In the case where the path is
62+
* a file. The list will be a singleton with the file itself.
63+
*
64+
* @return Partial listing of the path being listed.
65+
* @throws IOException if there was an exception getting the listing.
66+
*/
67+
public List<T> get() throws IOException {
68+
if (exception != null) {
69+
throw exception.unwrapRemoteException();
70+
}
71+
return partialListing;
72+
}
73+
74+
/**
75+
* Path being listed.
76+
*
77+
* @return the path being listed.
78+
*/
79+
public Path getListedPath() {
80+
return listedPath;
81+
}
82+
83+
@Override
84+
public String toString() {
85+
return new ToStringBuilder(this)
86+
.append("listedPath", listedPath)
87+
.append("partialListing", partialListing)
88+
.append("exception", exception)
89+
.toString();
90+
}
91+
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.net.URI;
2828
import java.util.EnumSet;
2929
import java.util.Iterator;
30+
import java.util.List;
3031

3132
import org.apache.commons.logging.Log;
3233
import org.apache.hadoop.conf.Configuration;
@@ -105,6 +106,10 @@ public FSDataOutputStream create(Path f, FsPermission permission,
105106
public FileStatus[] listStatusBatch(Path f, byte[] token);
106107
public FileStatus[] listStatus(Path[] files);
107108
public FileStatus[] listStatus(Path[] files, PathFilter filter);
109+
public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
110+
final List<Path> paths) throws IOException;
111+
public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
112+
final List<Path> paths) throws IOException;
108113
public FileStatus[] globStatus(Path pathPattern);
109114
public FileStatus[] globStatus(Path pathPattern, PathFilter filter);
110115
public Iterator<LocatedFileStatus> listFiles(Path path,

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ public FSDataOutputStream create(Path f, FsPermission permission,
124124
public FileStatus[] listStatusBatch(Path f, byte[] token);
125125
public FileStatus[] listStatus(Path[] files);
126126
public FileStatus[] listStatus(Path[] files, PathFilter filter);
127+
public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
128+
final List<Path> paths) throws IOException;
129+
public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
130+
final List<Path> paths) throws IOException;
127131
public FileStatus[] globStatus(Path pathPattern);
128132
public FileStatus[] globStatus(Path pathPattern, PathFilter filter);
129133

hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
<Class name="org.apache.hadoop.hdfs.inotify.EventBatch"/>
77
<Class name="org.apache.hadoop.hdfs.protocol.HdfsFileStatus"/>
88
<Class name="org.apache.hadoop.hdfs.protocol.LocatedBlock"/>
9+
<Class name="org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing" />
910
<Class name="org.apache.hadoop.hdfs.protocol.BlockStoragePolicy"/>
1011
<Class name="org.apache.hadoop.hdfs.protocol.CorruptFileBlocks"/>
1112
<Class name="org.apache.hadoop.hdfs.protocol.StripedBlockInfo"/>

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import org.apache.hadoop.hdfs.net.Peer;
106106
import org.apache.hadoop.hdfs.protocol.AclException;
107107
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
108+
import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
108109
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
109110
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
110111
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -1675,6 +1676,24 @@ public DirectoryListing listPaths(String src, byte[] startAfter,
16751676
}
16761677
}
16771678

1679+
/**
1680+
* Get a batched listing for the indicated directories
1681+
*
1682+
* @see ClientProtocol#getBatchedListing(String[], byte[], boolean)
1683+
*/
1684+
public BatchedDirectoryListing batchedListPaths(
1685+
String[] srcs, byte[] startAfter, boolean needLocation)
1686+
throws IOException {
1687+
checkOpen();
1688+
try {
1689+
return namenode.getBatchedListing(srcs, startAfter, needLocation);
1690+
} catch(RemoteException re) {
1691+
throw re.unwrapRemoteException(AccessControlException.class,
1692+
FileNotFoundException.class,
1693+
UnresolvedPathException.class);
1694+
}
1695+
}
1696+
16781697
/**
16791698
* Get the file info for a specific file or directory.
16801699
* @param src The string representation of the path to the file
@@ -1694,7 +1713,7 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
16941713
}
16951714
}
16961715

1697-
/**
1716+
/**
16981717
* Get the file info for a specific file or directory.
16991718
* @param src The string representation of the path to the file
17001719
* @param needBlockToken Include block tokens in {@link LocatedBlocks}.

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.base.Preconditions;
24+
import com.google.common.collect.Lists;
2425
import org.apache.commons.collections.list.TreeList;
2526
import org.apache.hadoop.HadoopIllegalArgumentException;
2627
import org.apache.hadoop.classification.InterfaceAudience;
@@ -49,6 +50,7 @@
4950
import org.apache.hadoop.fs.GlobalStorageStatistics;
5051
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
5152
import org.apache.hadoop.fs.InvalidPathHandleException;
53+
import org.apache.hadoop.fs.PartialListing;
5254
import org.apache.hadoop.fs.PathHandle;
5355
import org.apache.hadoop.fs.LocatedFileStatus;
5456
import org.apache.hadoop.fs.Options;
@@ -73,6 +75,7 @@
7375
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
7476
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
7577
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
78+
import org.apache.hadoop.hdfs.protocol.BatchedDirectoryListing;
7679
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
7780
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
7881
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -81,6 +84,7 @@
8184
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
8285
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
8386
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
87+
import org.apache.hadoop.hdfs.protocol.HdfsPartialListing;
8488
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
8589
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
8690
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
@@ -108,6 +112,8 @@
108112
import org.apache.hadoop.security.token.DelegationTokenIssuer;
109113
import org.apache.hadoop.util.ChunkedArrayList;
110114
import org.apache.hadoop.util.Progressable;
115+
import org.slf4j.Logger;
116+
import org.slf4j.LoggerFactory;
111117

112118
import javax.annotation.Nonnull;
113119
import java.io.FileNotFoundException;
@@ -120,6 +126,7 @@
120126
import java.util.EnumSet;
121127
import java.util.List;
122128
import java.util.Map;
129+
import java.util.NoSuchElementException;
123130
import java.util.Optional;
124131

125132
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -1292,6 +1299,110 @@ public T next() throws IOException {
12921299
}
12931300
}
12941301

1302+
@Override
1303+
public RemoteIterator<PartialListing<FileStatus>> batchedListStatusIterator(
1304+
final List<Path> paths) throws IOException {
1305+
List<Path> absPaths = Lists.newArrayListWithCapacity(paths.size());
1306+
for (Path p : paths) {
1307+
absPaths.add(fixRelativePart(p));
1308+
}
1309+
return new PartialListingIterator<>(absPaths, false);
1310+
}
1311+
1312+
@Override
1313+
public RemoteIterator<PartialListing<LocatedFileStatus>> batchedListLocatedStatusIterator(
1314+
final List<Path> paths) throws IOException {
1315+
List<Path> absPaths = Lists.newArrayListWithCapacity(paths.size());
1316+
for (Path p : paths) {
1317+
absPaths.add(fixRelativePart(p));
1318+
}
1319+
return new PartialListingIterator<>(absPaths, true);
1320+
}
1321+
1322+
private static final Logger LBI_LOG =
1323+
LoggerFactory.getLogger(PartialListingIterator.class);
1324+
1325+
private class PartialListingIterator<T extends FileStatus>
1326+
implements RemoteIterator<PartialListing<T>> {
1327+
1328+
private List<Path> paths;
1329+
private String[] srcs;
1330+
private boolean needLocation;
1331+
private BatchedDirectoryListing batchedListing;
1332+
private int listingIdx = 0;
1333+
1334+
PartialListingIterator(List<Path> paths, boolean needLocation)
1335+
throws IOException {
1336+
this.paths = paths;
1337+
this.srcs = new String[paths.size()];
1338+
for (int i = 0; i < paths.size(); i++) {
1339+
this.srcs[i] = getPathName(paths.get(i));
1340+
}
1341+
this.needLocation = needLocation;
1342+
1343+
// Do the first listing
1344+
statistics.incrementReadOps(1);
1345+
storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS);
1346+
batchedListing = dfs.batchedListPaths(
1347+
srcs, HdfsFileStatus.EMPTY_NAME, needLocation);
1348+
LBI_LOG.trace("Got batchedListing: {}", batchedListing);
1349+
if (batchedListing == null) { // the directory does not exist
1350+
throw new FileNotFoundException("One or more paths do not exist.");
1351+
}
1352+
}
1353+
1354+
@Override
1355+
public boolean hasNext() throws IOException {
1356+
if (batchedListing == null) {
1357+
return false;
1358+
}
1359+
// If we're done with the current batch, try to get the next batch
1360+
if (listingIdx >= batchedListing.getListings().length) {
1361+
if (!batchedListing.hasMore()) {
1362+
LBI_LOG.trace("No more elements");
1363+
return false;
1364+
}
1365+
batchedListing = dfs.batchedListPaths(
1366+
srcs, batchedListing.getStartAfter(), needLocation);
1367+
LBI_LOG.trace("Got batchedListing: {}", batchedListing);
1368+
listingIdx = 0;
1369+
}
1370+
return listingIdx < batchedListing.getListings().length;
1371+
}
1372+
1373+
@Override
1374+
@SuppressWarnings("unchecked")
1375+
public PartialListing<T> next() throws IOException {
1376+
if (!hasNext()) {
1377+
throw new NoSuchElementException("No more entries");
1378+
}
1379+
HdfsPartialListing listing = batchedListing.getListings()[listingIdx];
1380+
listingIdx++;
1381+
1382+
Path parent = paths.get(listing.getParentIdx());
1383+
1384+
if (listing.getException() != null) {
1385+
return new PartialListing<>(parent, listing.getException());
1386+
}
1387+
1388+
// Qualify paths for the client.
1389+
List<HdfsFileStatus> statuses = listing.getPartialListing();
1390+
List<T> qualifiedStatuses =
1391+
Lists.newArrayListWithCapacity(statuses.size());
1392+
1393+
for (HdfsFileStatus status : statuses) {
1394+
if (needLocation) {
1395+
qualifiedStatuses.add((T)((HdfsLocatedFileStatus) status)
1396+
.makeQualifiedLocated(getUri(), parent));
1397+
} else {
1398+
qualifiedStatuses.add((T)status.makeQualified(getUri(), parent));
1399+
}
1400+
}
1401+
1402+
return new PartialListing<>(parent, qualifiedStatuses);
1403+
}
1404+
}
1405+
12951406
/**
12961407
* Create a directory, only when the parent directories exist.
12971408
*

0 commit comments

Comments
 (0)