implements
private DirectoryEntries entries;
private int i = 0;
- DirListingIterator(Path path) {
+ DirListingIterator(Path path) throws IOException {
this.path = path;
+ this.entries = listStatusBatch(path, null);
}
@Override
public boolean hasNext() throws IOException {
- if (entries == null) {
- fetchMore();
- }
return i < entries.getEntries().length ||
entries.hasMore();
}
private void fetchMore() throws IOException {
- byte[] token = null;
- if (entries != null) {
- token = entries.getToken();
- }
+ byte[] token = entries.getToken();
entries = listStatusBatch(path, token);
i = 0;
}
@@ -2135,7 +2306,9 @@ private void fetchMore() throws IOException {
@Override
@SuppressWarnings("unchecked")
public T next() throws IOException {
- Preconditions.checkState(hasNext(), "No more items in iterator");
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more items in iterator");
+ }
if (i == entries.getEntries().length) {
fetchMore();
}
@@ -2235,10 +2408,19 @@ public LocatedFileStatus next() throws IOException {
/** Return the current user's home directory in this FileSystem.
* The default implementation returns {@code "/user/$USER/"}.
+ * @return the path.
*/
public Path getHomeDirectory() {
+ String username;
+ try {
+ username = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch(IOException ex) {
+ LOGGER.warn("Unable to get user name. Fall back to system property " +
+ "user.name", ex);
+ username = System.getProperty("user.name");
+ }
return this.makeQualified(
- new Path(USER_HOME_PREFIX + "/" + System.getProperty("user.name")));
+ new Path(USER_HOME_PREFIX + "/" + username));
}
@@ -2289,6 +2471,7 @@ public boolean mkdirs(Path f) throws IOException {
* @param f path to create
* @param permission to apply to f
* @throws IOException IO failure
+ * @return if mkdir success true, not false.
*/
public abstract boolean mkdirs(Path f, FsPermission permission
) throws IOException;
@@ -2336,6 +2519,7 @@ public void moveFromLocalFile(Path src, Path dst)
* @param delSrc whether to delete the src
* @param src path
* @param dst path
+ * @throws IOException IO failure.
*/
public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
@@ -2450,6 +2634,7 @@ public void copyToLocalFile(boolean delSrc, Path src, Path dst,
* @param fsOutputFile path of output file
* @param tmpLocalFile path of local tmp file
* @throws IOException IO failure
+ * @return the path.
*/
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
@@ -2483,14 +2668,21 @@ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
*/
@Override
public void close() throws IOException {
+ debugLogFileSystemClose("close", "Key: " + key + "; URI: " + getUri()
+ + "; Object Identity Hash: "
+ + Integer.toHexString(System.identityHashCode(this)));
// delete all files that were marked as delete-on-exit.
- processDeleteOnExit();
- CACHE.remove(this.key, this);
+ try {
+ processDeleteOnExit();
+ } finally {
+ CACHE.remove(this.key, this);
+ }
}
/**
* Return the total size of all files in the filesystem.
* @throws IOException IO failure
+ * @return the number of path used.
*/
public long getUsed() throws IOException {
Path path = new Path("/");
@@ -2499,7 +2691,9 @@ public long getUsed() throws IOException {
/**
* Return the total size of all files from a specified path.
+ * @param path the path.
* @throws IOException IO failure
+ * @return the number of path content summary.
*/
public long getUsed(Path path) throws IOException {
return getContentSummary(path).getLength();
@@ -2522,6 +2716,7 @@ public long getBlockSize(Path f) throws IOException {
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time.
* @deprecated use {@link #getDefaultBlockSize(Path)} instead
+ * @return default block size.
*/
@Deprecated
public long getDefaultBlockSize() {
@@ -2568,6 +2763,20 @@ public short getDefaultReplication(Path path) {
*/
public abstract FileStatus getFileStatus(Path f) throws IOException;
+ /**
+ * Synchronize client metadata state.
+ *
+ * In some FileSystem implementations such as HDFS metadata
+ * synchronization is essential to guarantee consistency of read requests
+ * particularly in HA setting.
+ * @throws IOException If an I/O error occurred.
+ * @throws UnsupportedOperationException if the operation is unsupported.
+ */
+ public void msync() throws IOException, UnsupportedOperationException {
+ throw new UnsupportedOperationException(getClass().getCanonicalName() +
+ " does not support method msync");
+ }
+
/**
* Checks if the user can access a path. The mode specifies which access
* checks to perform. If the requested permissions are granted, then the
@@ -2637,6 +2846,8 @@ static void checkAccessPermissions(FileStatus stat, FsAction mode)
/**
* See {@link FileContext#fixRelativePart}.
+ * @param p the path.
+ * @return relative part.
*/
protected Path fixRelativePart(Path p) {
if (p.isUriPathAbsolute()) {
@@ -2648,6 +2859,18 @@ protected Path fixRelativePart(Path p) {
/**
* See {@link FileContext#createSymlink(Path, Path, boolean)}.
+ *
+ * @param target target path.
+ * @param link link.
+ * @param createParent create parent.
+ * @throws AccessControlException if access is denied.
+ * @throws FileAlreadyExistsException when the path does not exist.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws ParentNotDirectoryException if the parent path of dest is not
+ * a directory.
+ * @throws UnsupportedFileSystemException if there was no known implementation
+ * for the scheme.
+ * @throws IOException raised on errors performing I/O.
*/
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException,
@@ -2661,8 +2884,14 @@ public void createSymlink(final Path target, final Path link,
/**
* See {@link FileContext#getFileLinkStatus(Path)}.
- * @throws FileNotFoundException when the path does not exist
- * @throws IOException see specific implementation
+ *
+ * @param f the path.
+ * @throws AccessControlException if access is denied.
+ * @throws FileNotFoundException when the path does not exist.
+ * @throws IOException raised on errors performing I/O.
+ * @throws UnsupportedFileSystemException if there was no known implementation
+ * for the scheme.
+ * @return file status
*/
public FileStatus getFileLinkStatus(final Path f)
throws AccessControlException, FileNotFoundException,
@@ -2673,6 +2902,7 @@ public FileStatus getFileLinkStatus(final Path f)
/**
* See {@link AbstractFileSystem#supportsSymlinks()}.
+ * @return if support symlinkls true, not false.
*/
public boolean supportsSymlinks() {
return false;
@@ -2680,8 +2910,11 @@ public boolean supportsSymlinks() {
/**
* See {@link FileContext#getLinkTarget(Path)}.
+ * @param f the path.
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
+ * @throws IOException IO failure.
+ * @return the path.
*/
public Path getLinkTarget(Path f) throws IOException {
// Supporting filesystems should override this method
@@ -2691,8 +2924,11 @@ public Path getLinkTarget(Path f) throws IOException {
/**
* See {@link AbstractFileSystem#getLinkTarget(Path)}.
+ * @param f the path.
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
+ * @throws IOException IO failure.
+ * @return the path.
*/
protected Path resolveLink(Path f) throws IOException {
// Supporting filesystems should override this method
@@ -2872,7 +3108,7 @@ public void deleteSnapshot(Path path, String snapshotName)
* changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
- * @param aclSpec List describing modifications
+ * @param aclSpec List<AclEntry> describing modifications
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
@@ -3065,7 +3301,7 @@ public Map getXAttrs(Path path, List names)
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
- * @return List of the XAttr names of the file or directory
+ * @return List{@literal } of the XAttr names of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
@@ -3093,6 +3329,16 @@ public void removeXAttr(Path path, String name) throws IOException {
+ " doesn't support removeXAttr");
}
+ /**
+ * Set the source path to satisfy storage policy.
+ * @param path The source path referring to either a directory or a file.
+ * @throws IOException If an I/O error occurred.
+ */
+ public void satisfyStoragePolicy(final Path path) throws IOException {
+ throw new UnsupportedOperationException(
+ getClass().getSimpleName() + " doesn't support setStoragePolicy");
+ }
+
/**
* Set the storage policy for a given file or directory.
*
@@ -3199,6 +3445,25 @@ public Collection getTrashRoots(boolean allUsers) {
return ret;
}
+ /**
+ * The base FileSystem implementation generally has no knowledge
+ * of the capabilities of actual implementations.
+ * Unless it has a way to explicitly determine the capabilities,
+ * this method returns false.
+ * {@inheritDoc}
+ */
+ public boolean hasPathCapability(final Path path, final String capability)
+ throws IOException {
+ switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
+ case CommonPathCapabilities.FS_SYMLINKS:
+ // delegate to the existing supportsSymlinks() call.
+ return supportsSymlinks() && areSymlinksEnabled();
+ default:
+ // the feature is not implemented.
+ return false;
+ }
+ }
+
// making it volatile to be able to do a double checked locking
private volatile static boolean FILE_SYSTEMS_LOADED = false;
@@ -3303,11 +3568,26 @@ public static Class extends FileSystem> getFileSystemClass(String scheme,
private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
Tracer tracer = FsTracer.get(conf);
- try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
+ try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
+ DurationInfo ignored =
+ new DurationInfo(LOGGER, false, "Creating FS %s", uri)) {
scope.addKVAnnotation("scheme", uri.getScheme());
- Class> clazz = getFileSystemClass(uri.getScheme(), conf);
- FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
- fs.initialize(uri, conf);
+ Class extends FileSystem> clazz =
+ getFileSystemClass(uri.getScheme(), conf);
+ FileSystem fs = ReflectionUtils.newInstance(clazz, conf);
+ try {
+ fs.initialize(uri, conf);
+ } catch (IOException | RuntimeException e) {
+ // exception raised during initialization.
+ // log summary at warn and full stack at debug
+ LOGGER.warn("Failed to initialize fileystem {}: {}",
+ uri, e.toString());
+ LOGGER.debug("Failed to initialize fileystem", e);
+ // then (robustly) close the FS, so as to invoke any
+ // cleanup code.
+ IOUtils.cleanupWithLogger(LOGGER, fs);
+ throw e;
+ }
return fs;
}
}
@@ -3319,9 +3599,33 @@ static class Cache {
private final Map map = new HashMap<>();
private final Set toAutoClose = new HashSet<>();
+ /** Semaphore used to serialize creation of new FS instances. */
+ private final Semaphore creatorPermits;
+
+ /**
+ * Counter of the number of discarded filesystem instances
+ * in this cache. Primarily for testing, but it could possibly
+ * be made visible as some kind of metric.
+ */
+ private final AtomicLong discardedInstances = new AtomicLong(0);
+
/** A variable that makes all objects in the cache unique. */
private static AtomicLong unique = new AtomicLong(1);
+ /**
+ * Instantiate. The configuration is used to read the
+ * count of permits issued for concurrent creation
+ * of filesystem instances.
+ * @param conf configuration
+ */
+ Cache(final Configuration conf) {
+ int permits = conf.getInt(FS_CREATION_PARALLEL_COUNT,
+ FS_CREATION_PARALLEL_COUNT_DEFAULT);
+ checkArgument(permits > 0, "Invalid value of %s: %s",
+ FS_CREATION_PARALLEL_COUNT, permits);
+ creatorPermits = new Semaphore(permits);
+ }
+
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
@@ -3344,7 +3648,7 @@ FileSystem getUnique(URI uri, Configuration conf) throws IOException{
* @param conf configuration
* @param key key to store/retrieve this FileSystem in the cache
* @return a cached or newly instantiated FileSystem.
- * @throws IOException
+ * @throws IOException If an I/O error occurred.
*/
private FileSystem getInternal(URI uri, Configuration conf, Key key)
throws IOException{
@@ -3355,28 +3659,82 @@ private FileSystem getInternal(URI uri, Configuration conf, Key key)
if (fs != null) {
return fs;
}
-
- fs = createFileSystem(uri, conf);
- synchronized (this) { // refetch the lock again
- FileSystem oldfs = map.get(key);
- if (oldfs != null) { // a file system is created while lock is releasing
- fs.close(); // close the new file system
- return oldfs; // return the old file system
- }
-
- // now insert the new file system into the map
- if (map.isEmpty()
- && !ShutdownHookManager.get().isShutdownInProgress()) {
- ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
+ // fs not yet created, acquire lock
+ // to construct an instance.
+ try (DurationInfo d = new DurationInfo(LOGGER, false,
+ "Acquiring creator semaphore for %s", uri)) {
+ creatorPermits.acquireUninterruptibly();
+ }
+ FileSystem fsToClose = null;
+ try {
+ // See if FS was instantiated by another thread while waiting
+ // for the permit.
+ synchronized (this) {
+ fs = map.get(key);
}
- fs.key = key;
- map.put(key, fs);
- if (conf.getBoolean(
- FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
- toAutoClose.add(key);
+ if (fs != null) {
+ LOGGER.debug("Filesystem {} created while awaiting semaphore", uri);
+ return fs;
}
- return fs;
+ // create the filesystem
+ fs = createFileSystem(uri, conf);
+ final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
+ SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
+ ShutdownHookManager.TIME_UNIT_DEFAULT);
+ // any FS to close outside of the synchronized section
+ synchronized (this) { // lock on the Cache object
+
+ // see if there is now an entry for the FS, which happens
+ // if another thread's creation overlapped with this one.
+ FileSystem oldfs = map.get(key);
+ if (oldfs != null) {
+ // a file system was created in a separate thread.
+ // save the FS reference to close outside all locks,
+ // and switch to returning the oldFS
+ fsToClose = fs;
+ fs = oldfs;
+ } else {
+ // register the clientFinalizer if needed and shutdown isn't
+ // already active
+ if (map.isEmpty()
+ && !ShutdownHookManager.get().isShutdownInProgress()) {
+ ShutdownHookManager.get().addShutdownHook(clientFinalizer,
+ SHUTDOWN_HOOK_PRIORITY, timeout,
+ ShutdownHookManager.TIME_UNIT_DEFAULT);
+ }
+ // insert the new file system into the map
+ fs.key = key;
+ map.put(key, fs);
+ if (conf.getBoolean(
+ FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
+ toAutoClose.add(key);
+ }
+ }
+ } // end of synchronized block
+ } finally {
+ // release the creator permit.
+ creatorPermits.release();
}
+ if (fsToClose != null) {
+ LOGGER.debug("Duplicate FS created for {}; discarding {}",
+ uri, fs);
+ discardedInstances.incrementAndGet();
+ // close the new file system
+ // note this will briefly remove and reinstate "fsToClose" from
+ // the map. It is done in a synchronized block so will not be
+ // visible to others.
+ IOUtils.cleanupWithLogger(LOGGER, fsToClose);
+ }
+ return fs;
+ }
+
+ /**
+ * Get the count of discarded instances.
+ * @return the new instance.
+ */
+ @VisibleForTesting
+ long getDiscardedInstances() {
+ return discardedInstances.get();
}
synchronized void remove(Key key, FileSystem fs) {
@@ -3781,6 +4139,7 @@ public void run() {
/**
* Get or create the thread-local data associated with the current thread.
+ * @return statistics data.
*/
public StatisticsData getThreadStatistics() {
StatisticsData data = threadData.get();
@@ -4139,6 +4498,7 @@ public static synchronized Map getStatistics() {
/**
* Return the FileSystem classes that have Statistics.
* @deprecated use {@link #getGlobalStorageStatistics()}
+ * @return statistics lists.
*/
@Deprecated
public static synchronized List getAllStatistics() {
@@ -4147,6 +4507,7 @@ public static synchronized List getAllStatistics() {
/**
* Get the statistics for a particular file system.
+ * @param scheme scheme.
* @param cls the class to lookup
* @return a statistics object
* @deprecated use {@link #getGlobalStorageStatistics()}
@@ -4181,6 +4542,7 @@ public static synchronized void clearStatistics() {
/**
* Print all statistics for all file systems to {@code System.out}
+ * @throws IOException If an I/O error occurred.
*/
public static synchronized
void printStatistics() throws IOException {
@@ -4221,24 +4583,47 @@ public StorageStatistics getStorageStatistics() {
/**
* Get the global storage statistics.
+ * @return global storage statistics.
*/
public static GlobalStorageStatistics getGlobalStorageStatistics() {
return GlobalStorageStatistics.INSTANCE;
}
+ /**
+ * Create instance of the standard FSDataOutputStreamBuilder for the
+ * given filesystem and path.
+ * @param fileSystem owner
+ * @param path path to create
+ * @return a builder.
+ */
+ @InterfaceStability.Unstable
+ protected static FSDataOutputStreamBuilder createDataOutputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final Path path) {
+ return new FileSystemDataOutputStreamBuilder(fileSystem, path);
+ }
+
+ /**
+ * Standard implementation of the FSDataOutputStreamBuilder; invokes
+ * create/createNonRecursive or Append depending upon the options.
+ */
private static final class FileSystemDataOutputStreamBuilder extends
FSDataOutputStreamBuilder {
/**
* Constructor.
+ * @param fileSystem owner
+ * @param p path to create
*/
- protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+ private FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
super(fileSystem, p);
}
@Override
public FSDataOutputStream build() throws IOException {
+ rejectUnknownMandatoryKeys(Collections.emptySet(),
+ " for " + getPath());
if (getFlags().contains(CreateFlag.CREATE) ||
getFlags().contains(CreateFlag.OVERWRITE)) {
if (isRecursive()) {
@@ -4253,11 +4638,12 @@ public FSDataOutputStream build() throws IOException {
} else if (getFlags().contains(CreateFlag.APPEND)) {
return getFS().append(getPath(), getBufferSize(), getProgress());
}
- throw new IOException("Must specify either create, overwrite or append");
+ throw new PathIOException(getPath().toString(),
+ "Must specify either create, overwrite or append");
}
@Override
- protected FileSystemDataOutputStreamBuilder getThisBuilder() {
+ public FileSystemDataOutputStreamBuilder getThisBuilder() {
return this;
}
}
@@ -4273,7 +4659,7 @@ protected FileSystemDataOutputStreamBuilder getThisBuilder() {
* builder interface becomes stable.
*/
public FSDataOutputStreamBuilder createFile(Path path) {
- return new FileSystemDataOutputStreamBuilder(this, path)
+ return createDataOutputStreamBuilder(this, path)
.create().overwrite(true);
}
@@ -4283,6 +4669,236 @@ public FSDataOutputStreamBuilder createFile(Path path) {
* @return a {@link FSDataOutputStreamBuilder} to build file append request.
*/
public FSDataOutputStreamBuilder appendFile(Path path) {
- return new FileSystemDataOutputStreamBuilder(this, path).append();
+ return createDataOutputStreamBuilder(this, path).append();
+ }
+
+ /**
+ * Open a file for reading through a builder API.
+ * Ultimately calls {@link #open(Path, int)} unless a subclass
+ * executes the open command differently.
+ *
+ * The semantics of this call are therefore the same as that of
+ * {@link #open(Path, int)} with one special point: it is in
+ * {@code FSDataInputStreamBuilder.build()} in which the open operation
+ * takes place -it is there where all preconditions to the operation
+ * are checked.
+ * @param path file path
+ * @return a FSDataInputStreamBuilder object to build the input stream
+ * @throws IOException if some early checks cause IO failures.
+ * @throws UnsupportedOperationException if support is checked early.
+ */
+ @InterfaceStability.Unstable
+ public FutureDataInputStreamBuilder openFile(Path path)
+ throws IOException, UnsupportedOperationException {
+ return createDataInputStreamBuilder(this, path).getThisBuilder();
+ }
+
+ /**
+ * Open a file for reading through a builder API.
+ * Ultimately calls {@link #open(PathHandle, int)} unless a subclass
+ * executes the open command differently.
+ *
+ * If PathHandles are unsupported, this may fail in the
+ * {@code FSDataInputStreamBuilder.build()} command,
+ * rather than in this {@code openFile()} operation.
+ * @param pathHandle path handle.
+ * @return a FSDataInputStreamBuilder object to build the input stream
+ * @throws IOException if some early checks cause IO failures.
+ * @throws UnsupportedOperationException if support is checked early.
+ */
+ @InterfaceStability.Unstable
+ public FutureDataInputStreamBuilder openFile(PathHandle pathHandle)
+ throws IOException, UnsupportedOperationException {
+ return createDataInputStreamBuilder(this, pathHandle)
+ .getThisBuilder();
+ }
+
+ /**
+ * Execute the actual open file operation.
+ *
+ * This is invoked from {@code FSDataInputStreamBuilder.build()}
+ * and from {@link DelegateToFileSystem} and is where
+ * the action of opening the file should begin.
+ *
+ * The base implementation performs a blocking
+ * call to {@link #open(Path, int)} in this call;
+ * the actual outcome is in the returned {@code CompletableFuture}.
+ * This avoids having to create some thread pool, while still
+ * setting up the expectation that the {@code get()} call
+ * is needed to evaluate the result.
+ * @param path path to the file
+ * @param parameters open file parameters from the builder.
+ * @return a future which will evaluate to the opened file.
+ * @throws IOException failure to resolve the link.
+ * @throws IllegalArgumentException unknown mandatory key
+ */
+ protected CompletableFuture openFileWithOptions(
+ final Path path,
+ final OpenFileParameters parameters) throws IOException {
+ AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+ parameters.getMandatoryKeys(),
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
+ "for " + path);
+ return LambdaUtils.eval(
+ new CompletableFuture<>(), () ->
+ open(path, parameters.getBufferSize()));
+ }
+
+ /**
+ * Execute the actual open file operation.
+ * The base implementation performs a blocking
+ * call to {@link #open(Path, int)} in this call;
+ * the actual outcome is in the returned {@code CompletableFuture}.
+ * This avoids having to create some thread pool, while still
+ * setting up the expectation that the {@code get()} call
+ * is needed to evaluate the result.
+ * @param pathHandle path to the file
+ * @param parameters open file parameters from the builder.
+ * @return a future which will evaluate to the opened file.
+ * @throws IOException failure to resolve the link.
+ * @throws IllegalArgumentException unknown mandatory key
+ * @throws UnsupportedOperationException PathHandles are not supported.
+ * This may be deferred until the future is evaluated.
+ */
+ protected CompletableFuture openFileWithOptions(
+ final PathHandle pathHandle,
+ final OpenFileParameters parameters) throws IOException {
+ AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+ parameters.getMandatoryKeys(),
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
+ CompletableFuture result = new CompletableFuture<>();
+ try {
+ result.complete(open(pathHandle, parameters.getBufferSize()));
+ } catch (UnsupportedOperationException tx) {
+ // fail fast here
+ throw tx;
+ } catch (Throwable tx) {
+ // fail lazily here to ensure callers expect all File IO operations to
+ // surface later
+ result.completeExceptionally(tx);
+ }
+ return result;
+ }
+
+ /**
+ * Helper method that throws an {@link UnsupportedOperationException} for the
+ * current {@link FileSystem} method being called.
+ */
+ private void methodNotSupported() {
+ // The order of the stacktrace elements is (from top to bottom):
+ // - java.lang.Thread.getStackTrace
+ // - org.apache.hadoop.fs.FileSystem.methodNotSupported
+ // -
+ // therefore, to find out the current method name, we use the element at
+ // index 2.
+ String name = Thread.currentThread().getStackTrace()[2].getMethodName();
+ throw new UnsupportedOperationException(getClass().getCanonicalName() +
+ " does not support method " + name);
+ }
+
+ /**
+ * Create instance of the standard {@link FSDataInputStreamBuilder} for the
+ * given filesystem and path.
+ * @param fileSystem owner
+ * @param path path to read
+ * @return a builder.
+ */
+ @InterfaceAudience.LimitedPrivate("Filesystems")
+ @InterfaceStability.Unstable
+ protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final Path path) {
+ return new FSDataInputStreamBuilder(fileSystem, path);
+ }
+
+ /**
+ * Create instance of the standard {@link FSDataInputStreamBuilder} for the
+ * given filesystem and path handle.
+ * @param fileSystem owner
+ * @param pathHandle path handle of file to open.
+ * @return a builder.
+ */
+ @InterfaceAudience.LimitedPrivate("Filesystems")
+ @InterfaceStability.Unstable
+ protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final PathHandle pathHandle) {
+ return new FSDataInputStreamBuilder(fileSystem, pathHandle);
+ }
+
+ /**
+ * Builder returned for {@code #openFile(Path)}
+ * and {@code #openFile(PathHandle)}.
+ */
+ private static class FSDataInputStreamBuilder
+ extends FutureDataInputStreamBuilderImpl
+ implements FutureDataInputStreamBuilder {
+
+ /**
+ * Path Constructor.
+ * @param fileSystem owner
+ * @param path path to open.
+ */
+ protected FSDataInputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final Path path) {
+ super(fileSystem, path);
+ }
+
+ /**
+ * Construct from a path handle.
+ * @param fileSystem owner
+ * @param pathHandle path handle of file to open.
+ */
+ protected FSDataInputStreamBuilder(
+ @Nonnull final FileSystem fileSystem,
+ @Nonnull final PathHandle pathHandle) {
+ super(fileSystem, pathHandle);
+ }
+
+ /**
+ * Perform the open operation.
+ * Returns a future which, when get() or a chained completion
+ * operation is invoked, will supply the input stream of the file
+ * referenced by the path/path handle.
+ * @return a future to the input stream.
+ * @throws IOException early failure to open
+ * @throws UnsupportedOperationException if the specific operation
+ * is not supported.
+ * @throws IllegalArgumentException if the parameters are not valid.
+ */
+ @Override
+ public CompletableFuture build() throws IOException {
+ Optional optionalPath = getOptionalPath();
+ OpenFileParameters parameters = new OpenFileParameters()
+ .withMandatoryKeys(getMandatoryKeys())
+ .withOptionalKeys(getOptionalKeys())
+ .withOptions(getOptions())
+ .withStatus(super.getStatus())
+ .withBufferSize(
+ getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
+ if(optionalPath.isPresent()) {
+ return getFS().openFileWithOptions(optionalPath.get(),
+ parameters);
+ } else {
+ return getFS().openFileWithOptions(getPathHandle(),
+ parameters);
+ }
+ }
+
+ }
+
+ /**
+ * Create a multipart uploader.
+ * @param basePath file path under which all files are uploaded
+ * @return a MultipartUploaderBuilder object to build the uploader
+ * @throws IOException if some early checks cause IO failures.
+ * @throws UnsupportedOperationException if support is checked early.
+ */
+ @InterfaceStability.Unstable
+ public MultipartUploaderBuilder createMultipartUploader(Path basePath)
+ throws IOException {
+ methodNotSupported();
+ return null;
}
}
diff --git a/src/main/java/org/apache/hadoop/fs/ForwardingFileSystemCache.java b/src/main/java/org/apache/hadoop/fs/ForwardingFileSystemCache.java
index 0cb5b50..6df1e57 100644
--- a/src/main/java/org/apache/hadoop/fs/ForwardingFileSystemCache.java
+++ b/src/main/java/org/apache/hadoop/fs/ForwardingFileSystemCache.java
@@ -28,6 +28,7 @@ final class ForwardingFileSystemCache
public ForwardingFileSystemCache(FileSystemCache cache)
{
+ super(new Configuration(false));
this.cache = requireNonNull(cache, "cache is null");
}
diff --git a/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
deleted file mode 100644
index cac427b..0000000
--- a/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ /dev/null
@@ -1,1039 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.azurebfs;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
-import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
-import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
-import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
-import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
-import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
-import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
-import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
-import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
-import org.apache.hadoop.fs.azurebfs.services.AuthType;
-import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
-import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
-import org.apache.hadoop.fs.azurebfs.utils.Base64;
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.http.client.utils.URIBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
-import static org.apache.hadoop.util.Time.now;
-
-/**
- * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class AzureBlobFileSystemStore {
- private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
-
- private AbfsClient client;
- private URI uri;
- private final UserGroupInformation userGroupInformation;
- private final String userName;
- private final String primaryUserGroup;
- // See https://issues.apache.org/jira/browse/HADOOP-16479 for DATE_TIME_PATTERN modification
- // TODO Drop the change (and the file) after upgrading to hadoop 3.3.0
- private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss z";
- private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
- private static final int LIST_MAX_RESULTS = 5000;
- private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000;
- private static final int RENAME_TIMEOUT_MILISECONDS = 180000;
-
- private final AbfsConfiguration abfsConfiguration;
- private final Set azureAtomicRenameDirSet;
- private boolean isNamespaceEnabledSet;
- private boolean isNamespaceEnabled;
-
- public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration, UserGroupInformation userGroupInformation)
- throws AzureBlobFileSystemException, IOException {
- this.uri = uri;
-
- String[] authorityParts = authorityParts(uri);
- final String fileSystemName = authorityParts[0];
- final String accountName = authorityParts[1];
-
- try {
- this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
- } catch (IllegalAccessException exception) {
- throw new FileSystemOperationUnhandledException(exception);
- }
-
- this.userGroupInformation = userGroupInformation;
- this.userName = userGroupInformation.getShortUserName();
-
- if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
- primaryUserGroup = userGroupInformation.getPrimaryGroupName();
- } else {
- //Provide a default group name
- primaryUserGroup = userName;
- }
-
- this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
- abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
-
- boolean usingOauth = (AuthType.OAuth == abfsConfiguration.getEnum(
- FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey));
-
- boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
- initializeClient(uri, fileSystemName, accountName, useHttps);
- }
-
- private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
- final String authority = uri.getRawAuthority();
- if (null == authority) {
- throw new InvalidUriAuthorityException(uri.toString());
- }
-
- if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
- throw new InvalidUriAuthorityException(uri.toString());
- }
-
- final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
-
- if (authorityParts.length < 2 || authorityParts[0] != null
- && authorityParts[0].isEmpty()) {
- final String errMsg = String
- .format("'%s' has a malformed authority, expected container name. "
- + "Authority takes the form "
- + FileSystemUriSchemes.ABFS_SCHEME + "://[@]",
- uri.toString());
- throw new InvalidUriException(errMsg);
- }
- return authorityParts;
- }
-
- public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
- if (!isNamespaceEnabledSet) {
- LOG.debug("getFilesystemProperties for filesystem: {}",
- client.getFileSystem());
-
- final AbfsRestOperation op = client.getFilesystemProperties();
- isNamespaceEnabled = Boolean.parseBoolean(
- op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_NAMESPACE_ENABLED));
- isNamespaceEnabledSet = true;
- }
-
- return isNamespaceEnabled;
- }
-
- @VisibleForTesting
- URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
- String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;
-
- final URIBuilder uriBuilder = new URIBuilder();
- uriBuilder.setScheme(scheme);
-
- // For testing purposes, an IP address and port may be provided to override
- // the host specified in the FileSystem URI. Also note that the format of
- // the Azure Storage Service URI changes from
- // http[s]://[account][domain-suffix]/[filesystem] to
- // http[s]://[ip]:[port]/[account]/[filesystem].
- String endPoint = abfsConfiguration.get(AZURE_ABFS_ENDPOINT);
- if (endPoint == null || !endPoint.contains(AbfsHttpConstants.COLON)) {
- uriBuilder.setHost(hostName);
- return uriBuilder;
- }
-
- // Split ip and port
- String[] data = endPoint.split(AbfsHttpConstants.COLON);
- if (data.length != 2) {
- throw new RuntimeException(String.format("ABFS endpoint is not set correctly : %s, "
- + "Do not specify scheme when using {IP}:{PORT}", endPoint));
- }
- uriBuilder.setHost(data[0].trim());
- uriBuilder.setPort(Integer.parseInt(data[1].trim()));
- uriBuilder.setPath("/" + UriUtils.extractAccountNameFromHostName(hostName));
-
- return uriBuilder;
- }
-
- public AbfsConfiguration getAbfsConfiguration() {
- return this.abfsConfiguration;
- }
-
- public Hashtable getFilesystemProperties() throws AzureBlobFileSystemException {
- LOG.debug("getFilesystemProperties for filesystem: {}",
- client.getFileSystem());
-
- final Hashtable parsedXmsProperties;
-
- final AbfsRestOperation op = client.getFilesystemProperties();
- final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
-
- parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
-
- return parsedXmsProperties;
- }
-
- public void setFilesystemProperties(final Hashtable properties)
- throws AzureBlobFileSystemException {
- if (properties == null || properties.isEmpty()) {
- return;
- }
-
- LOG.debug("setFilesystemProperties for filesystem: {} with properties: {}",
- client.getFileSystem(),
- properties);
-
- final String commaSeparatedProperties;
- try {
- commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
- } catch (CharacterCodingException ex) {
- throw new InvalidAbfsRestOperationException(ex);
- }
-
- client.setFilesystemProperties(commaSeparatedProperties);
- }
-
- public Hashtable getPathProperties(final Path path) throws AzureBlobFileSystemException {
- LOG.debug("getPathProperties for filesystem: {} path: {}",
- client.getFileSystem(),
- path);
-
- final Hashtable parsedXmsProperties;
- final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
-
- final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
-
- parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
-
- return parsedXmsProperties;
- }
-
- public void setPathProperties(final Path path, final Hashtable properties) throws AzureBlobFileSystemException {
- LOG.debug("setFilesystemProperties for filesystem: {} path: {} with properties: {}",
- client.getFileSystem(),
- path,
- properties);
-
- final String commaSeparatedProperties;
- try {
- commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
- } catch (CharacterCodingException ex) {
- throw new InvalidAbfsRestOperationException(ex);
- }
- client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), commaSeparatedProperties);
- }
-
- public void createFilesystem() throws AzureBlobFileSystemException {
- LOG.debug("createFilesystem for filesystem: {}",
- client.getFileSystem());
-
- client.createFilesystem();
- }
-
- public void deleteFilesystem() throws AzureBlobFileSystemException {
- LOG.debug("deleteFilesystem for filesystem: {}",
- client.getFileSystem());
-
- client.deleteFilesystem();
- }
-
- public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
- final FsPermission umask) throws AzureBlobFileSystemException {
- boolean isNamespaceEnabled = getIsNamespaceEnabled();
- LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
- client.getFileSystem(),
- path,
- overwrite,
- permission.toString(),
- umask.toString(),
- isNamespaceEnabled);
-
- client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
- isNamespaceEnabled ? getOctalNotation(permission) : null,
- isNamespaceEnabled ? getOctalNotation(umask) : null);
-
- return new AbfsOutputStream(
- client,
- AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
- 0,
- abfsConfiguration.getWriteBufferSize(),
- abfsConfiguration.isFlushEnabled());
- }
-
- public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
- throws AzureBlobFileSystemException {
- boolean isNamespaceEnabled = getIsNamespaceEnabled();
- LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
- client.getFileSystem(),
- path,
- permission,
- umask,
- isNamespaceEnabled);
-
- client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
- isNamespaceEnabled ? getOctalNotation(permission) : null,
- isNamespaceEnabled ? getOctalNotation(umask) : null);
- }
-
- public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics)
- throws AzureBlobFileSystemException {
- LOG.debug("openFileForRead filesystem: {} path: {}",
- client.getFileSystem(),
- path);
-
- final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
-
- final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-
- if (parseIsDirectory(resourceType)) {
- throw new AbfsRestOperationException(
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
- "openFileForRead must be used with files and not directories",
- null);
- }
-
- // Add statistics for InputStream
- return new AbfsInputStream(client, statistics,
- AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
- abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag);
- }
-
- public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
- AzureBlobFileSystemException {
- LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
- client.getFileSystem(),
- path,
- overwrite);
-
- final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
-
- final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
-
- if (parseIsDirectory(resourceType)) {
- throw new AbfsRestOperationException(
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
- "openFileForRead must be used with files and not directories",
- null);
- }
-
- final long offset = overwrite ? 0 : contentLength;
-
- return new AbfsOutputStream(
- client,
- AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
- offset,
- abfsConfiguration.getWriteBufferSize(),
- abfsConfiguration.isFlushEnabled());
- }
-
- public void rename(final Path source, final Path destination) throws
- AzureBlobFileSystemException {
-
- if (isAtomicRenameKey(source.getName())) {
- LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
- +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
- }
-
- LOG.debug("renameAsync filesystem: {} source: {} destination: {}",
- client.getFileSystem(),
- source,
- destination);
-
- String continuation = null;
- long deadline = now() + RENAME_TIMEOUT_MILISECONDS;
-
- do {
- if (now() > deadline) {
- LOG.debug("Rename {} to {} timed out.",
- source,
- destination);
-
- throw new TimeoutException("Rename timed out.");
- }
-
- AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source),
- AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation);
- continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
-
- } while (continuation != null && !continuation.isEmpty());
- }
-
- public void delete(final Path path, final boolean recursive)
- throws AzureBlobFileSystemException {
- LOG.debug("delete filesystem: {} path: {} recursive: {}",
- client.getFileSystem(),
- path,
- String.valueOf(recursive));
-
- String continuation = null;
- long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS;
-
- do {
- if (now() > deadline) {
- LOG.debug("Delete directory {} timed out.", path);
-
- throw new TimeoutException("Delete directory timed out.");
- }
-
- AbfsRestOperation op = client.deletePath(
- AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation);
- continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
-
- } while (continuation != null && !continuation.isEmpty());
- }
-
- public FileStatus getFileStatus(final Path path) throws IOException {
- boolean isNamespaceEnabled = getIsNamespaceEnabled();
- LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}",
- client.getFileSystem(),
- path,
- isNamespaceEnabled);
-
- if (path.isRoot()) {
- final AbfsRestOperation op = isNamespaceEnabled
- ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
- : client.getFilesystemProperties();
-
- final long blockSize = abfsConfiguration.getAzureBlockSize();
- final String owner = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
- final String group = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
- final String permissions = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
- final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
- final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-
- return new VersionedFileStatus(
- owner == null ? userName : owner,
- group == null ? primaryUserGroup : group,
- permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
- : AbfsPermission.valueOf(permissions),
- hasAcl,
- 0,
- true,
- 1,
- blockSize,
- parseLastModifiedTime(lastModified),
- path,
- eTag);
- } else {
- AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
-
- final long blockSize = abfsConfiguration.getAzureBlockSize();
- final AbfsHttpOperation result = op.getResult();
- final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
- final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
- final String contentLength = result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
- final String resourceType = result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
- final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
- final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
- final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-
- return new VersionedFileStatus(
- owner == null ? userName : owner,
- group == null ? primaryUserGroup : group,
- permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
- : AbfsPermission.valueOf(permissions),
- hasAcl,
- parseContentLength(contentLength),
- parseIsDirectory(resourceType),
- 1,
- blockSize,
- parseLastModifiedTime(lastModified),
- path,
- eTag);
- }
- }
-
- public FileStatus[] listStatus(final Path path) throws IOException {
- LOG.debug("listStatus filesystem: {} path: {}",
- client.getFileSystem(),
- path);
-
- String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
- String continuation = null;
- ArrayList fileStatuses = new ArrayList<>();
-
- do {
- AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation);
- continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
- ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
- if (retrievedSchema == null) {
- throw new AbfsRestOperationException(
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
- "listStatusAsync path not found",
- null, op.getResult());
- }
-
- long blockSize = abfsConfiguration.getAzureBlockSize();
-
- for (ListResultEntrySchema entry : retrievedSchema.paths()) {
- final String owner = entry.owner() == null ? userName : entry.owner();
- final String group = entry.group() == null ? primaryUserGroup : entry.group();
- final FsPermission fsPermission = entry.permissions() == null
- ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
- : AbfsPermission.valueOf(entry.permissions());
- final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions());
-
- long lastModifiedMillis = 0;
- long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
- boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
- if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
- lastModifiedMillis = parseLastModifiedTime(entry.lastModified());
- }
-
- Path entryPath = new Path(File.separator + entry.name());
- entryPath = entryPath.makeQualified(this.uri, entryPath);
-
- fileStatuses.add(
- new VersionedFileStatus(
- owner,
- group,
- fsPermission,
- hasAcl,
- contentLength,
- isDirectory,
- 1,
- blockSize,
- lastModifiedMillis,
- entryPath,
- entry.eTag()));
- }
-
- } while (continuation != null && !continuation.isEmpty());
-
- return fileStatuses.toArray(new FileStatus[0]);
- }
-
- public void setOwner(final Path path, final String owner, final String group) throws
- AzureBlobFileSystemException {
- if (!getIsNamespaceEnabled()) {
- throw new UnsupportedOperationException(
- "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
- }
-
- LOG.debug(
- "setOwner filesystem: {} path: {} owner: {} group: {}",
- client.getFileSystem(),
- path.toString(),
- owner,
- group);
- client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), owner, group);
- }
-
- public void setPermission(final Path path, final FsPermission permission) throws
- AzureBlobFileSystemException {
- if (!getIsNamespaceEnabled()) {
- throw new UnsupportedOperationException(
- "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
- }
-
- LOG.debug(
- "setPermission filesystem: {} path: {} permission: {}",
- client.getFileSystem(),
- path.toString(),
- permission.toString());
- client.setPermission(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
- String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal()));
- }
-
- public void modifyAclEntries(final Path path, final List aclSpec) throws
- AzureBlobFileSystemException {
- if (!getIsNamespaceEnabled()) {
- throw new UnsupportedOperationException(
- "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
- }
-
- LOG.debug(
- "modifyAclEntries filesystem: {} path: {} aclSpec: {}",
- client.getFileSystem(),
- path.toString(),
- AclEntry.aclSpecToString(aclSpec));
-
- final Map modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
-
- final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-
- final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
-
- for (Map.Entry modifyAclEntry : modifyAclEntries.entrySet()) {
- aclEntries.put(modifyAclEntry.getKey(), modifyAclEntry.getValue());
- }
-
- if (!modifyAclEntries.containsKey(AbfsHttpConstants.ACCESS_MASK)) {
- aclEntries.remove(AbfsHttpConstants.ACCESS_MASK);
- }
-
- if (!modifyAclEntries.containsKey(AbfsHttpConstants.DEFAULT_MASK)) {
- aclEntries.remove(AbfsHttpConstants.DEFAULT_MASK);
- }
-
- client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
- AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
- }
-
- public void removeAclEntries(final Path path, final List aclSpec) throws AzureBlobFileSystemException {
- if (!getIsNamespaceEnabled()) {
- throw new UnsupportedOperationException(
- "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
- }
-
- LOG.debug(
- "removeAclEntries filesystem: {} path: {} aclSpec: {}",
- client.getFileSystem(),
- path.toString(),
- AclEntry.aclSpecToString(aclSpec));
-
- final Map removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
- final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-
- final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
-
- AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries);
-
- client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
- AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
- }
-
- public void removeDefaultAcl(final Path path) throws AzureBlobFileSystemException {
- if (!getIsNamespaceEnabled()) {
- throw new UnsupportedOperationException(
- "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
- }
-
- LOG.debug(
- "removeDefaultAcl filesystem: {} path: {}",
- client.getFileSystem(),
- path.toString());
-
- final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
- final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
- final Map defaultAclEntries = new HashMap<>();
-
- for (Map.Entry aclEntry : aclEntries.entrySet()) {
- if (aclEntry.getKey().startsWith("default:")) {
- defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue());
- }
- }
-
- for (Map.Entry defaultAclEntry : defaultAclEntries.entrySet()) {
- aclEntries.remove(defaultAclEntry.getKey());
- }
-
- client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
- AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
- }
-
- public void removeAcl(final Path path) throws AzureBlobFileSystemException {
- if (!getIsNamespaceEnabled()) {
- throw new UnsupportedOperationException(
- "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
- }
-
- LOG.debug(
- "removeAcl filesystem: {} path: {}",
- client.getFileSystem(),
- path.toString());
- final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-
- final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
- final Map newAclEntries = new HashMap<>();
-
- newAclEntries.put(AbfsHttpConstants.ACCESS_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER));
- newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
- newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
-
- client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
- AbfsAclHelper.serializeAclSpec(newAclEntries), eTag);
- }
-
- public void setAcl(final Path path, final List aclSpec) throws AzureBlobFileSystemException {
- if (!getIsNamespaceEnabled()) {
- throw new UnsupportedOperationException(
- "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
- }
-
- LOG.debug(
- "setAcl filesystem: {} path: {} aclspec: {}",
- client.getFileSystem(),
- path.toString(),
- AclEntry.aclSpecToString(aclSpec));
- final Map aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
- final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-
- final Map getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
- for (Map.Entry ace : getAclEntries.entrySet()) {
- if (ace.getKey().startsWith("default:") && (ace.getKey() != AbfsHttpConstants.DEFAULT_MASK)
- && !aclEntries.containsKey(ace.getKey())) {
- aclEntries.put(ace.getKey(), ace.getValue());
- }
- }
-
- client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
- AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
- }
-
- public AclStatus getAclStatus(final Path path) throws IOException {
- if (!getIsNamespaceEnabled()) {
- throw new UnsupportedOperationException(
- "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
- }
-
- LOG.debug(
- "getAclStatus filesystem: {} path: {}",
- client.getFileSystem(),
- path.toString());
- AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
- AbfsHttpOperation result = op.getResult();
-
- final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
- final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
- final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
- final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
-
- final List processedAclEntries = AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true);
- final FsPermission fsPermission = permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
- : AbfsPermission.valueOf(permissions);
-
- final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
- aclStatusBuilder.owner(owner == null ? userName : owner);
- aclStatusBuilder.group(group == null ? primaryUserGroup : group);
-
- aclStatusBuilder.setPermission(fsPermission);
- aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
- aclStatusBuilder.addEntries(processedAclEntries);
- return aclStatusBuilder.build();
- }
-
- public boolean isAtomicRenameKey(String key) {
- return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
- }
-
- private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) throws AzureBlobFileSystemException {
- if (this.client != null) {
- return;
- }
-
- final URIBuilder uriBuilder = getURIBuilder(accountName, isSecure);
-
- final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName;
-
- URL baseUrl;
- try {
- baseUrl = new URL(url);
- } catch (MalformedURLException e) {
- throw new InvalidUriException(uri.toString());
- }
-
- SharedKeyCredentials creds = null;
- AccessTokenProvider tokenProvider = null;
-
- if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) {
- int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
- if (dotIndex <= 0) {
- throw new InvalidUriException(
- uri.toString() + " - account name is not fully qualified.");
- }
- creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
- abfsConfiguration.getStorageAccountKey());
- } else {
- tokenProvider = abfsConfiguration.getTokenProvider();
- }
-
- this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);
- }
-
- private String getOctalNotation(FsPermission fsPermission) {
- Preconditions.checkNotNull(fsPermission, "fsPermission");
- return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
- }
-
- private String getRelativePath(final Path path) {
- return getRelativePath(path, false);
- }
-
- private String getRelativePath(final Path path, final boolean allowRootPath) {
- Preconditions.checkNotNull(path, "path");
- final String relativePath = path.toUri().getPath();
-
- if (relativePath.length() == 0 || (relativePath.length() == 1 && relativePath.charAt(0) == Path.SEPARATOR_CHAR)) {
- return allowRootPath ? AbfsHttpConstants.ROOT_PATH : AbfsHttpConstants.EMPTY_STRING;
- }
-
- if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
- return relativePath.substring(1);
- }
-
- return relativePath;
- }
-
- private long parseContentLength(final String contentLength) {
- if (contentLength == null) {
- return -1;
- }
-
- return Long.parseLong(contentLength);
- }
-
- private boolean parseIsDirectory(final String resourceType) {
- return resourceType != null
- && resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
- }
-
- private long parseLastModifiedTime(final String lastModifiedTime) {
- long parsedTime = 0;
- try {
- Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN).parse(lastModifiedTime);
- parsedTime = utcDate.getTime();
- } catch (ParseException e) {
- LOG.error("Failed to parse the date {}", lastModifiedTime);
- } finally {
- return parsedTime;
- }
- }
-
- private String convertXmsPropertiesToCommaSeparatedString(final Hashtable properties) throws
- CharacterCodingException {
- StringBuilder commaSeparatedProperties = new StringBuilder();
-
- final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder();
-
- for (Map.Entry propertyEntry : properties.entrySet()) {
- String key = propertyEntry.getKey();
- String value = propertyEntry.getValue();
-
- Boolean canEncodeValue = encoder.canEncode(value);
- if (!canEncodeValue) {
- throw new CharacterCodingException();
- }
-
- String encodedPropertyValue = Base64.encode(encoder.encode(CharBuffer.wrap(value)).array());
- commaSeparatedProperties.append(key)
- .append(AbfsHttpConstants.EQUAL)
- .append(encodedPropertyValue);
-
- commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
- }
-
- if (commaSeparatedProperties.length() != 0) {
- commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1);
- }
-
- return commaSeparatedProperties.toString();
- }
-
- private Hashtable parseCommaSeparatedXmsProperties(String xMsProperties) throws
- InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
- Hashtable properties = new Hashtable<>();
-
- final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder();
-
- if (xMsProperties != null && !xMsProperties.isEmpty()) {
- String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA);
-
- if (userProperties.length == 0) {
- return properties;
- }
-
- for (String property : userProperties) {
- if (property.isEmpty()) {
- throw new InvalidFileSystemPropertyException(xMsProperties);
- }
-
- String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2);
- if (nameValue.length != 2) {
- throw new InvalidFileSystemPropertyException(xMsProperties);
- }
-
- byte[] decodedValue = Base64.decode(nameValue[1]);
-
- final String value;
- try {
- value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString();
- } catch (CharacterCodingException ex) {
- throw new InvalidAbfsRestOperationException(ex);
- }
- properties.put(nameValue[0], value);
- }
- }
-
- return properties;
- }
-
- private boolean isKeyForDirectorySet(String key, Set dirSet) {
- for (String dir : dirSet) {
- if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) {
- return true;
- }
-
- try {
- URI uri = new URI(dir);
- if (null == uri.getAuthority()) {
- if (key.startsWith(dir + "/")){
- return true;
- }
- }
- } catch (URISyntaxException e) {
- LOG.info("URI syntax error creating URI for {}", dir);
- }
- }
-
- return false;
- }
-
- private static class VersionedFileStatus extends FileStatus {
- private final String version;
-
- VersionedFileStatus(
- final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl,
- final long length, final boolean isdir, final int blockReplication,
- final long blocksize, final long modificationTime, final Path path,
- String version) {
- super(length, isdir, blockReplication, blocksize, modificationTime, 0,
- fsPermission,
- owner,
- group,
- null,
- path,
- hasAcl, false, false);
-
- this.version = version;
- }
-
- /** Compare if this object is equal to another object.
- * @param obj the object to be compared.
- * @return true if two file status has the same path name; false if not.
- */
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof FileStatus)) {
- return false;
- }
-
- FileStatus other = (FileStatus) obj;
-
- if (!this.getPath().equals(other.getPath())) {
- return false;
- }
-
- if (other instanceof VersionedFileStatus) {
- return this.version.equals(((VersionedFileStatus) other).version);
- }
-
- return true;
- }
-
- /**
- * Returns a hash code value for the object, which is defined as
- * the hash code of the path name.
- *
- * @return a hash code value for the path name and version
- */
- @Override
- public int hashCode() {
- int hash = getPath().hashCode();
- hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0);
- return hash;
- }
-
- /**
- * Returns the version of this FileStatus
- *
- * @return a string value for the FileStatus version
- */
- public String getVersion() {
- return this.version;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder(
- "VersionedFileStatus{");
- sb.append(super.toString());
- sb.append("; version='").append(version).append('\'');
- sb.append('}');
- return sb.toString();
- }
- }
-
- @VisibleForTesting
- AbfsClient getClient() {
- return this.client;
- }
-}
diff --git a/src/main/java/org/apache/hadoop/security/LdapGroupsMapping.java b/src/main/java/org/apache/hadoop/security/LdapGroupsMapping.java
deleted file mode 100644
index 4c7370f..0000000
--- a/src/main/java/org/apache/hadoop/security/LdapGroupsMapping.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.security;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.HashSet;
-import java.util.Collection;
-import java.util.Set;
-
-import javax.naming.Context;
-import javax.naming.NamingEnumeration;
-import javax.naming.NamingException;
-import javax.naming.directory.Attribute;
-import javax.naming.directory.DirContext;
-import javax.naming.directory.InitialDirContext;
-import javax.naming.directory.SearchControls;
-import javax.naming.directory.SearchResult;
-import javax.naming.ldap.LdapName;
-import javax.naming.ldap.Rdn;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of {@link GroupMappingServiceProvider} which
- * connects directly to an LDAP server for determining group membership.
- *
- * This provider should be used only if it is necessary to map users to
- * groups that reside exclusively in an Active Directory or LDAP installation.
- * The common case for a Hadoop installation will be that LDAP users and groups
- * materialized on the Unix servers, and for an installation like that,
- * ShellBasedUnixGroupsMapping is preferred. However, in cases where
- * those users and groups aren't materialized in Unix, but need to be used for
- * access control, this class may be used to communicate directly with the LDAP
- * server.
- *
- * It is important to note that resolving group mappings will incur network
- * traffic, and may cause degraded performance, although user-group mappings
- * will be cached via the infrastructure provided by {@link Groups}.
- *
- * This implementation does not support configurable search limits. If a filter
- * is used for searching users or groups which returns more results than are
- * allowed by the server, an exception will be thrown.
- *
- * The implementation attempts to resolve group hierarchies,
- * to a configurable limit.
- * If the limit is 0, in order to be considered a member of a group,
- * the user must be an explicit member in LDAP. Otherwise, it will traverse the
- * group hierarchy n levels up.
- */
-@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
-@InterfaceStability.Evolving
-public class LdapGroupsMapping
- implements GroupMappingServiceProvider, Configurable {
-
- public static final String LDAP_CONFIG_PREFIX = "hadoop.security.group.mapping.ldap";
-
- /*
- * URL of the LDAP server
- */
- public static final String LDAP_URL_KEY = LDAP_CONFIG_PREFIX + ".url";
- public static final String LDAP_URL_DEFAULT = "";
-
- /*
- * Should SSL be used to connect to the server
- */
- public static final String LDAP_USE_SSL_KEY = LDAP_CONFIG_PREFIX + ".ssl";
- public static final Boolean LDAP_USE_SSL_DEFAULT = false;
-
- /*
- * File path to the location of the SSL keystore to use
- */
- public static final String LDAP_KEYSTORE_KEY = LDAP_CONFIG_PREFIX + ".ssl.keystore";
- public static final String LDAP_KEYSTORE_DEFAULT = "";
-
- /*
- * Password for the keystore
- */
- public static final String LDAP_KEYSTORE_PASSWORD_KEY = LDAP_CONFIG_PREFIX + ".ssl.keystore.password";
- public static final String LDAP_KEYSTORE_PASSWORD_DEFAULT = "";
-
- public static final String LDAP_KEYSTORE_PASSWORD_FILE_KEY = LDAP_KEYSTORE_PASSWORD_KEY + ".file";
- public static final String LDAP_KEYSTORE_PASSWORD_FILE_DEFAULT = "";
-
-
- /**
- * File path to the location of the SSL truststore to use
- */
- public static final String LDAP_TRUSTSTORE_KEY = LDAP_CONFIG_PREFIX +
- ".ssl.truststore";
-
- /**
- * The key of the credential entry containing the password for
- * the LDAP SSL truststore
- */
- public static final String LDAP_TRUSTSTORE_PASSWORD_KEY =
- LDAP_CONFIG_PREFIX +".ssl.truststore.password";
-
- /**
- * The path to a file containing the password for
- * the LDAP SSL truststore
- */
- public static final String LDAP_TRUSTSTORE_PASSWORD_FILE_KEY =
- LDAP_TRUSTSTORE_PASSWORD_KEY + ".file";
-
- /*
- * User to bind to the LDAP server with
- */
- public static final String BIND_USER_KEY = LDAP_CONFIG_PREFIX + ".bind.user";
- public static final String BIND_USER_DEFAULT = "";
-
- /*
- * Password for the bind user
- */
- public static final String BIND_PASSWORD_KEY = LDAP_CONFIG_PREFIX + ".bind.password";
- public static final String BIND_PASSWORD_DEFAULT = "";
-
- public static final String BIND_PASSWORD_FILE_KEY = BIND_PASSWORD_KEY + ".file";
- public static final String BIND_PASSWORD_FILE_DEFAULT = "";
-
- /*
- * Base distinguished name to use for searches
- */
- public static final String BASE_DN_KEY = LDAP_CONFIG_PREFIX + ".base";
- public static final String BASE_DN_DEFAULT = "";
-
- /*
- * Base DN used in user search.
- */
- public static final String USER_BASE_DN_KEY =
- LDAP_CONFIG_PREFIX + ".userbase";
-
- /*
- * Base DN used in group search.
- */
- public static final String GROUP_BASE_DN_KEY =
- LDAP_CONFIG_PREFIX + ".groupbase";
-
-
- /*
- * Any additional filters to apply when searching for users
- */
- public static final String USER_SEARCH_FILTER_KEY = LDAP_CONFIG_PREFIX + ".search.filter.user";
- public static final String USER_SEARCH_FILTER_DEFAULT = "(&(objectClass=user)(sAMAccountName={0}))";
-
- /*
- * Any additional filters to apply when finding relevant groups
- */
- public static final String GROUP_SEARCH_FILTER_KEY = LDAP_CONFIG_PREFIX + ".search.filter.group";
- public static final String GROUP_SEARCH_FILTER_DEFAULT = "(objectClass=group)";
-
- /*
- * LDAP attribute to use for determining group membership
- */
- public static final String MEMBEROF_ATTR_KEY =
- LDAP_CONFIG_PREFIX + ".search.attr.memberof";
- public static final String MEMBEROF_ATTR_DEFAULT = "";
-
- /*
- * LDAP attribute to use for determining group membership
- */
- public static final String GROUP_MEMBERSHIP_ATTR_KEY = LDAP_CONFIG_PREFIX + ".search.attr.member";
- public static final String GROUP_MEMBERSHIP_ATTR_DEFAULT = "member";
-
- /*
- * LDAP attribute to use for identifying a group's name
- */
- public static final String GROUP_NAME_ATTR_KEY = LDAP_CONFIG_PREFIX + ".search.attr.group.name";
- public static final String GROUP_NAME_ATTR_DEFAULT = "cn";
-
- /*
- * How many levels to traverse when checking for groups in the org hierarchy
- */
- public static final String GROUP_HIERARCHY_LEVELS_KEY =
- LDAP_CONFIG_PREFIX + ".search.group.hierarchy.levels";
- public static final int GROUP_HIERARCHY_LEVELS_DEFAULT = 0;
-
- /*
- * LDAP attribute names to use when doing posix-like lookups
- */
- public static final String POSIX_UID_ATTR_KEY = LDAP_CONFIG_PREFIX + ".posix.attr.uid.name";
- public static final String POSIX_UID_ATTR_DEFAULT = "uidNumber";
-
- public static final String POSIX_GID_ATTR_KEY = LDAP_CONFIG_PREFIX + ".posix.attr.gid.name";
- public static final String POSIX_GID_ATTR_DEFAULT = "gidNumber";
-
- /*
- * Posix attributes
- */
- public static final String POSIX_GROUP = "posixGroup";
- public static final String POSIX_ACCOUNT = "posixAccount";
-
- /*
- * LDAP {@link SearchControls} attribute to set the time limit
- * for an invoked directory search. Prevents infinite wait cases.
- */
- public static final String DIRECTORY_SEARCH_TIMEOUT =
- LDAP_CONFIG_PREFIX + ".directory.search.timeout";
- public static final int DIRECTORY_SEARCH_TIMEOUT_DEFAULT = 10000; // 10s
-
- public static final String CONNECTION_TIMEOUT =
- LDAP_CONFIG_PREFIX + ".connection.timeout.ms";
- public static final int CONNECTION_TIMEOUT_DEFAULT = 60 * 1000; // 60 seconds
- public static final String READ_TIMEOUT =
- LDAP_CONFIG_PREFIX + ".read.timeout.ms";
- public static final int READ_TIMEOUT_DEFAULT = 60 * 1000; // 60 seconds
-
- private static final Logger LOG =
- LoggerFactory.getLogger(LdapGroupsMapping.class);
-
- static final SearchControls SEARCH_CONTROLS = new SearchControls();
- static {
- SEARCH_CONTROLS.setSearchScope(SearchControls.SUBTREE_SCOPE);
- }
-
- private DirContext ctx;
- private Configuration conf;
-
- private String ldapUrl;
- private boolean useSsl;
- private String keystore;
- private String keystorePass;
- private String truststore;
- private String truststorePass;
- private String bindUser;
- private String bindPassword;
- private String userbaseDN;
- private String groupbaseDN;
- private String groupSearchFilter;
- private String userSearchFilter;
- private String memberOfAttr;
- private String groupMemberAttr;
- private String groupNameAttr;
- private int groupHierarchyLevels;
- private String posixUidAttr;
- private String posixGidAttr;
- private boolean isPosix;
- private boolean useOneQuery;
-
- public static final int RECONNECT_RETRY_COUNT = 3;
-
- /**
- * Returns list of groups for a user.
- *
- * The LdapCtx which underlies the DirContext object is not thread-safe, so
- * we need to block around this whole method. The caching infrastructure will
- * ensure that performance stays in an acceptable range.
- *
- * @param user get groups for this user
- * @return list of groups for a given user
- */
- @Override
- public synchronized List getGroups(String user) {
- /*
- * Normal garbage collection takes care of removing Context instances when they are no longer in use.
- * Connections used by Context instances being garbage collected will be closed automatically.
- * So in case connection is closed and gets CommunicationException, retry some times with new new DirContext/connection.
- */
- for(int retry = 0; retry < RECONNECT_RETRY_COUNT; retry++) {
- try {
- return doGetGroups(user, groupHierarchyLevels);
- } catch (NamingException e) {
- LOG.warn("Failed to get groups for user " + user + " (retry=" + retry
- + ") by " + e);
- LOG.trace("TRACE", e);
- }
-
- //reset ctx so that new DirContext can be created with new connection
- this.ctx = null;
- }
-
- return Collections.emptyList();
- }
-
- /**
- * A helper method to get the Relative Distinguished Name (RDN) from
- * Distinguished name (DN). According to Active Directory documentation,
- * a group object's RDN is a CN.
- *
- * @param distinguishedName A string representing a distinguished name.
- * @throws NamingException if the DN is malformed.
- * @return a string which represents the RDN
- */
- private String getRelativeDistinguishedName(String distinguishedName)
- throws NamingException {
- LdapName ldn = new LdapName(distinguishedName);
- List rdns = ldn.getRdns();
- if (rdns.isEmpty()) {
- throw new NamingException("DN is empty");
- }
- Rdn rdn = rdns.get(rdns.size()-1);
- if (rdn.getType().equalsIgnoreCase(groupNameAttr)) {
- String groupName = (String)rdn.getValue();
- return groupName;
- }
- throw new NamingException("Unable to find RDN: The DN " +
- distinguishedName + " is malformed.");
- }
-
- /**
- * Look up groups using posixGroups semantics. Use posix gid/uid to find
- * groups of the user.
- *
- * @param result the result object returned from the prior user lookup.
- * @param c the context object of the LDAP connection.
- * @return an object representing the search result.
- *
- * @throws NamingException if the server does not support posixGroups
- * semantics.
- */
- private NamingEnumeration lookupPosixGroup(SearchResult result,
- DirContext c) throws NamingException {
- String gidNumber = null;
- String uidNumber = null;
- Attribute gidAttribute = result.getAttributes().get(posixGidAttr);
- Attribute uidAttribute = result.getAttributes().get(posixUidAttr);
- String reason = "";
- if (gidAttribute == null) {
- reason = "Can't find attribute '" + posixGidAttr + "'.";
- } else {
- gidNumber = gidAttribute.get().toString();
- }
- if (uidAttribute == null) {
- reason = "Can't find attribute '" + posixUidAttr + "'.";
- } else {
- uidNumber = uidAttribute.get().toString();
- }
- if (uidNumber != null && gidNumber != null) {
- return c.search(groupbaseDN,
- "(&"+ groupSearchFilter + "(|(" + posixGidAttr + "={0})" +
- "(" + groupMemberAttr + "={1})))",
- new Object[] {gidNumber, uidNumber},
- SEARCH_CONTROLS);
- }
- throw new NamingException("The server does not support posixGroups " +
- "semantics. Reason: " + reason +
- " Returned user object: " + result.toString());
- }
-
- /**
- * Perform the second query to get the groups of the user.
- *
- * If posixGroups is enabled, use use posix gid/uid to find.
- * Otherwise, use the general group member attribute to find it.
- *
- * @param result the result object returned from the prior user lookup.
- * @param c the context object of the LDAP connection.
- * @return a list of strings representing group names of the user.
- * @throws NamingException if unable to find group names
- */
- private List lookupGroup(SearchResult result, DirContext c,
- int goUpHierarchy)
- throws NamingException {
- List groups = new ArrayList();
- Set groupDNs = new HashSet();
-
- NamingEnumeration groupResults = null;
- // perform the second LDAP query
- if (isPosix) {
- groupResults = lookupPosixGroup(result, c);
- } else {
- String userDn = result.getNameInNamespace();
- groupResults =
- c.search(groupbaseDN,
- "(&" + groupSearchFilter + "(" + groupMemberAttr + "={0}))",
- new Object[]{userDn},
- SEARCH_CONTROLS);
- }
- // if the second query is successful, group objects of the user will be
- // returned. Get group names from the returned objects.
- if (groupResults != null) {
- while (groupResults.hasMoreElements()) {
- SearchResult groupResult = groupResults.nextElement();
- getGroupNames(groupResult, groups, groupDNs, goUpHierarchy > 0);
- }
- if (goUpHierarchy > 0 && !isPosix) {
- // convert groups to a set to ensure uniqueness
- Set groupset = new HashSet(groups);
- goUpGroupHierarchy(groupDNs, goUpHierarchy, groupset);
- // convert set back to list for compatibility
- groups = new ArrayList(groupset);
- }
- }
- return groups;
- }
-
- /**
- * Perform LDAP queries to get group names of a user.
- *
- * Perform the first LDAP query to get the user object using the user's name.
- * If one-query is enabled, retrieve the group names from the user object.
- * If one-query is disabled, or if it failed, perform the second query to
- * get the groups.
- *
- * @param user user name
- * @return a list of group names for the user. If the user can not be found,
- * return an empty string array.
- * @throws NamingException if unable to get group names
- */
- List doGetGroups(String user, int goUpHierarchy)
- throws NamingException {
- DirContext c = getDirContext();
-
- // Search for the user. We'll only ever need to look at the first result
- NamingEnumeration results = c.search(userbaseDN,
- userSearchFilter, new Object[]{user}, SEARCH_CONTROLS);
- // return empty list if the user can not be found.
- if (!results.hasMoreElements()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("doGetGroups(" + user + ") returned no groups because the " +
- "user is not found.");
- }
- return new ArrayList();
- }
- SearchResult result = results.nextElement();
-
- List groups = null;
- if (useOneQuery) {
- try {
- /**
- * For Active Directory servers, the user object has an attribute
- * 'memberOf' that represents the DNs of group objects to which the
- * user belongs. So the second query may be skipped.
- */
- Attribute groupDNAttr = result.getAttributes().get(memberOfAttr);
- if (groupDNAttr == null) {
- throw new NamingException("The user object does not have '" +
- memberOfAttr + "' attribute." +
- "Returned user object: " + result.toString());
- }
- groups = new ArrayList();
- NamingEnumeration groupEnumeration = groupDNAttr.getAll();
- while (groupEnumeration.hasMore()) {
- String groupDN = groupEnumeration.next().toString();
- groups.add(getRelativeDistinguishedName(groupDN));
- }
- } catch (NamingException e) {
- // If the first lookup failed, fall back to the typical scenario.
- LOG.info("Failed to get groups from the first lookup. Initiating " +
- "the second LDAP query using the user's DN.", e);
- }
- }
- if (groups == null || groups.isEmpty() || goUpHierarchy > 0) {
- groups = lookupGroup(result, c, goUpHierarchy);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("doGetGroups(" + user + ") returned " + groups);
- }
- return groups;
- }
-
- /* Helper function to get group name from search results.
- */
- void getGroupNames(SearchResult groupResult, Collection groups,
- Collection groupDNs, boolean doGetDNs)
- throws NamingException {
- Attribute groupName = groupResult.getAttributes().get(groupNameAttr);
- if (groupName == null) {
- throw new NamingException("The group object does not have " +
- "attribute '" + groupNameAttr + "'.");
- }
- groups.add(groupName.get().toString());
- if (doGetDNs) {
- groupDNs.add(groupResult.getNameInNamespace());
- }
- }
-
- /* Implementation for walking up the ldap hierarchy
- * This function will iteratively find the super-group memebership of
- * groups listed in groupDNs and add them to
- * the groups set. It will walk up the hierarchy goUpHierarchy levels.
- * Note: This is an expensive operation and settings higher than 1
- * are NOT recommended as they will impact both the speed and
- * memory usage of all operations.
- * The maximum time for this function will be bounded by the ldap query
- * timeout and the number of ldap queries that it will make, which is
- * max(Recur Depth in LDAP, goUpHierarcy) * DIRECTORY_SEARCH_TIMEOUT
- *
- * @param ctx - The context for contacting the ldap server
- * @param groupDNs - the distinguished name of the groups whose parents we
- * want to look up
- * @param goUpHierarchy - the number of levels to go up,
- * @param groups - Output variable to store all groups that will be added
- */
- void goUpGroupHierarchy(Set groupDNs,
- int goUpHierarchy,
- Set groups)
- throws NamingException {
- if (goUpHierarchy <= 0 || groups.isEmpty()) {
- return;
- }
- DirContext context = getDirContext();
- Set nextLevelGroups = new HashSet();
- StringBuilder filter = new StringBuilder();
- filter.append("(&").append(groupSearchFilter).append("(|");
- for (String dn : groupDNs) {
- filter.append("(").append(groupMemberAttr).append("=")
- .append(dn).append(")");
- }
- filter.append("))");
- LOG.debug("Ldap group query string: " + filter.toString());
- NamingEnumeration groupResults =
- context.search(groupbaseDN,
- filter.toString(),
- SEARCH_CONTROLS);
- while (groupResults.hasMoreElements()) {
- SearchResult groupResult = groupResults.nextElement();
- getGroupNames(groupResult, groups, nextLevelGroups, true);
- }
- goUpGroupHierarchy(nextLevelGroups, goUpHierarchy - 1, groups);
- }
-
- DirContext getDirContext() throws NamingException {
- if (ctx == null) {
- // Set up the initial environment for LDAP connectivity
- Hashtable env = new Hashtable();
- env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
- env.put(Context.PROVIDER_URL, ldapUrl);
- env.put(Context.SECURITY_AUTHENTICATION, "simple");
-
- // Set up SSL security, if necessary
- if (useSsl) {
- env.put(Context.SECURITY_PROTOCOL, "ssl");
- if (!keystore.isEmpty()) {
- System.setProperty("javax.net.ssl.keyStore", keystore);
- }
- if (!keystorePass.isEmpty()) {
- System.setProperty("javax.net.ssl.keyStorePassword", keystorePass);
- }
- if (!truststore.isEmpty()) {
- System.setProperty("javax.net.ssl.trustStore", truststore);
- }
- if (!truststorePass.isEmpty()) {
- System.setProperty("javax.net.ssl.trustStorePassword",
- truststorePass);
- }
- }
-
- env.put(Context.SECURITY_PRINCIPAL, bindUser);
- env.put(Context.SECURITY_CREDENTIALS, bindPassword);
-
- env.put("com.sun.jndi.ldap.connect.timeout", conf.get(CONNECTION_TIMEOUT,
- String.valueOf(CONNECTION_TIMEOUT_DEFAULT)));
- env.put("com.sun.jndi.ldap.read.timeout", conf.get(READ_TIMEOUT,
- String.valueOf(READ_TIMEOUT_DEFAULT)));
-
- ctx = new InitialDirContext(env);
- }
- return ctx;
- }
-
- /**
- * Caches groups, no need to do that for this provider
- */
- @Override
- public void cacheGroupsRefresh() throws IOException {
- // does nothing in this provider of user to groups mapping
- }
-
- /**
- * Adds groups to cache, no need to do that for this provider
- *
- * @param groups unused
- */
- @Override
- public void cacheGroupsAdd(List groups) throws IOException {
- // does nothing in this provider of user to groups mapping
- }
-
- @Override
- public synchronized Configuration getConf() {
- return conf;
- }
-
- @Override
- public synchronized void setConf(Configuration conf) {
- ldapUrl = conf.get(LDAP_URL_KEY, LDAP_URL_DEFAULT);
- if (ldapUrl == null || ldapUrl.isEmpty()) {
- throw new RuntimeException("LDAP URL is not configured");
- }
-
- useSsl = conf.getBoolean(LDAP_USE_SSL_KEY, LDAP_USE_SSL_DEFAULT);
- if (useSsl) {
- loadSslConf(conf);
- }
-
- bindUser = conf.get(BIND_USER_KEY, BIND_USER_DEFAULT);
- bindPassword = getPassword(conf, BIND_PASSWORD_KEY, BIND_PASSWORD_DEFAULT);
- if (bindPassword.isEmpty()) {
- bindPassword = extractPassword(
- conf.get(BIND_PASSWORD_FILE_KEY, BIND_PASSWORD_FILE_DEFAULT));
- }
-
- String baseDN = conf.getTrimmed(BASE_DN_KEY, BASE_DN_DEFAULT);
-
- //User search base which defaults to base dn.
- userbaseDN = conf.getTrimmed(USER_BASE_DN_KEY, baseDN);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Usersearch baseDN: " + userbaseDN);
- }
-
- //Group search base which defaults to base dn.
- groupbaseDN = conf.getTrimmed(GROUP_BASE_DN_KEY, baseDN);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Groupsearch baseDN: " + userbaseDN);
- }
-
- groupSearchFilter =
- conf.get(GROUP_SEARCH_FILTER_KEY, GROUP_SEARCH_FILTER_DEFAULT);
- userSearchFilter =
- conf.get(USER_SEARCH_FILTER_KEY, USER_SEARCH_FILTER_DEFAULT);
- isPosix = groupSearchFilter.contains(POSIX_GROUP) && userSearchFilter
- .contains(POSIX_ACCOUNT);
- memberOfAttr =
- conf.get(MEMBEROF_ATTR_KEY, MEMBEROF_ATTR_DEFAULT);
- // if memberOf attribute is set, resolve group names from the attribute
- // of user objects.
- useOneQuery = !memberOfAttr.isEmpty();
- groupMemberAttr =
- conf.get(GROUP_MEMBERSHIP_ATTR_KEY, GROUP_MEMBERSHIP_ATTR_DEFAULT);
- groupNameAttr =
- conf.get(GROUP_NAME_ATTR_KEY, GROUP_NAME_ATTR_DEFAULT);
- groupHierarchyLevels =
- conf.getInt(GROUP_HIERARCHY_LEVELS_KEY, GROUP_HIERARCHY_LEVELS_DEFAULT);
- posixUidAttr =
- conf.get(POSIX_UID_ATTR_KEY, POSIX_UID_ATTR_DEFAULT);
- posixGidAttr =
- conf.get(POSIX_GID_ATTR_KEY, POSIX_GID_ATTR_DEFAULT);
-
- int dirSearchTimeout = conf.getInt(DIRECTORY_SEARCH_TIMEOUT, DIRECTORY_SEARCH_TIMEOUT_DEFAULT);
- SEARCH_CONTROLS.setTimeLimit(dirSearchTimeout);
- // Limit the attributes returned to only those required to speed up the search.
- // See HADOOP-10626 and HADOOP-12001 for more details.
- String[] returningAttributes;
- if (useOneQuery) {
- returningAttributes = new String[] {
- groupNameAttr, posixUidAttr, posixGidAttr, memberOfAttr};
- } else {
- returningAttributes = new String[] {
- groupNameAttr, posixUidAttr, posixGidAttr};
- }
- SEARCH_CONTROLS.setReturningAttributes(returningAttributes);
-
- this.conf = conf;
- }
-
- private void loadSslConf(Configuration sslConf) {
- keystore = sslConf.get(LDAP_KEYSTORE_KEY, LDAP_KEYSTORE_DEFAULT);
- keystorePass = getPassword(sslConf, LDAP_KEYSTORE_PASSWORD_KEY,
- LDAP_KEYSTORE_PASSWORD_DEFAULT);
- if (keystorePass.isEmpty()) {
- keystorePass = extractPassword(sslConf.get(
- LDAP_KEYSTORE_PASSWORD_FILE_KEY,
- LDAP_KEYSTORE_PASSWORD_FILE_DEFAULT));
- }
-
- truststore = sslConf.get(LDAP_TRUSTSTORE_KEY, "");
- truststorePass = getPasswordFromCredentialProviders(
- sslConf, LDAP_TRUSTSTORE_PASSWORD_KEY, "");
- if (truststorePass.isEmpty()) {
- truststorePass = extractPassword(
- sslConf.get(LDAP_TRUSTSTORE_PASSWORD_FILE_KEY, ""));
- }
- }
-
- String getPasswordFromCredentialProviders(
- Configuration conf, String alias, String defaultPass) {
- String password = defaultPass;
- try {
- char[] passchars = conf.getPasswordFromCredentialProviders(alias);
- if (passchars != null) {
- password = new String(passchars);
- }
- } catch (IOException ioe) {
- LOG.warn("Exception while trying to get password for alias {}: {}",
- alias, ioe);
- }
- return password;
- }
-
- /**
- * Passwords should not be stored in configuration. Use
- * {@link #getPasswordFromCredentialProviders(
- * Configuration, String, String)}
- * to avoid reading passwords from a configuration file.
- */
- @Deprecated
- String getPassword(Configuration conf, String alias, String defaultPass) {
- String password = defaultPass;
- try {
- char[] passchars = conf.getPassword(alias);
- if (passchars != null) {
- password = new String(passchars);
- }
- } catch (IOException ioe) {
- LOG.warn("Exception while trying to get password for alias " + alias
- + ": ", ioe);
- }
- return password;
- }
-
- String extractPassword(String pwFile) {
- if (pwFile.isEmpty()) {
- // If there is no password file defined, we'll assume that we should do
- // an anonymous bind
- return "";
- }
-
- StringBuilder password = new StringBuilder();
- try (Reader reader = new InputStreamReader(
- new FileInputStream(pwFile), StandardCharsets.UTF_8)) {
- int c = reader.read();
- while (c > -1) {
- password.append((char)c);
- c = reader.read();
- }
- return password.toString().trim();
- } catch (IOException ioe) {
- throw new RuntimeException("Could not read password file: " + pwFile, ioe);
- }
- }
-}
diff --git a/src/main/java/org/apache/hadoop/security/authentication/util/KerberosUtil.java b/src/main/java/org/apache/hadoop/security/authentication/util/KerberosUtil.java
deleted file mode 100644
index 5500a75..0000000
--- a/src/main/java/org/apache/hadoop/security/authentication/util/KerberosUtil.java
+++ /dev/null
@@ -1,462 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.security.authentication.util;
-
-import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.charset.IllegalCharsetNameException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.kerby.kerberos.kerb.keytab.Keytab;
-import org.apache.kerby.kerberos.kerb.type.base.PrincipalName;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.Oid;
-
-import javax.security.auth.Subject;
-import javax.security.auth.kerberos.KerberosPrincipal;
-import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.kerberos.KeyTab;
-
-public class KerberosUtil {
-
- /* Return the Kerberos login module name */
- public static String getKrb5LoginModuleName() {
- return (IBM_JAVA)
- ? "com.ibm.security.auth.module.Krb5LoginModule"
- : "com.sun.security.auth.module.Krb5LoginModule";
- }
-
- public static final Oid GSS_SPNEGO_MECH_OID =
- getNumericOidInstance("1.3.6.1.5.5.2");
- public static final Oid GSS_KRB5_MECH_OID =
- getNumericOidInstance("1.2.840.113554.1.2.2");
- public static final Oid NT_GSS_KRB5_PRINCIPAL_OID =
- getNumericOidInstance("1.2.840.113554.1.2.2.1");
-
- // numeric oids will never generate a GSSException for a malformed oid.
- // use to initialize statics.
- private static Oid getNumericOidInstance(String oidName) {
- try {
- return new Oid(oidName);
- } catch (GSSException ex) {
- throw new IllegalArgumentException(ex);
- }
- }
-
- /**
- * Returns the Oid instance from string oidName.
- * Use {@link GSS_SPNEGO_MECH_OID}, {@link GSS_KRB5_MECH_OID},
- * or {@link NT_GSS_KRB5_PRINCIPAL_OID} instead.
- *
- * @return Oid instance
- * @param oidName The oid Name
- * @throws NoSuchFieldException if the input is not supported.
- */
- @Deprecated
- public static Oid getOidInstance(String oidName)
- throws NoSuchFieldException {
- switch (oidName) {
- case "GSS_SPNEGO_MECH_OID":
- return GSS_SPNEGO_MECH_OID;
- case "GSS_KRB5_MECH_OID":
- return GSS_KRB5_MECH_OID;
- case "NT_GSS_KRB5_PRINCIPAL":
- return NT_GSS_KRB5_PRINCIPAL_OID;
- default:
- throw new NoSuchFieldException(
- "oidName: " + oidName + " is not supported.");
- }
- }
-
- /**
- * Return the default realm for this JVM.
- *
- * @return The default realm
- * @throws IllegalArgumentException If the default realm does not exist.
- * @throws ClassNotFoundException Not thrown. Exists for compatibility.
- * @throws NoSuchMethodException Not thrown. Exists for compatibility.
- * @throws IllegalAccessException Not thrown. Exists for compatibility.
- * @throws InvocationTargetException Not thrown. Exists for compatibility.
- */
- public static String getDefaultRealm()
- throws ClassNotFoundException, NoSuchMethodException,
- IllegalArgumentException, IllegalAccessException,
- InvocationTargetException {
- // Any name is okay.
- return new KerberosPrincipal("tmp", 1).getRealm();
- }
-
- /**
- * Return the default realm for this JVM.
- * If the default realm does not exist, this method returns null.
- *
- * @return The default realm
- */
- public static String getDefaultRealmProtected() {
- try {
- return getDefaultRealm();
- } catch (Exception e) {
- //silently catch everything
- return null;
- }
- }
-
- /*
- * For a Service Host Principal specification, map the host's domain
- * to kerberos realm, as specified by krb5.conf [domain_realm] mappings.
- * Unfortunately the mapping routines are private to the security.krb5
- * package, so have to construct a PrincipalName instance to derive the realm.
- *
- * Many things can go wrong with Kerberos configuration, and this is not
- * the place to be throwing exceptions to help debug them. Nor do we choose
- * to make potentially voluminous logs on every call to a communications API.
- * So we simply swallow all exceptions from the underlying libraries and
- * return null if we can't get a good value for the realmString.
- *
- * @param shortprinc A service principal name with host fqdn as instance, e.g.
- * "HTTP/myhost.mydomain"
- * @return String value of Kerberos realm, mapped from host fqdn
- * May be default realm, or may be null.
- */
- public static String getDomainRealm(String shortprinc) {
- Class> classRef;
- Object principalName; //of type sun.security.krb5.PrincipalName or IBM equiv
- String realmString = null;
- try {
- if (IBM_JAVA) {
- classRef = Class.forName("com.ibm.security.krb5.PrincipalName");
- } else {
- classRef = Class.forName("sun.security.krb5.PrincipalName");
- }
- int tKrbNtSrvHst = classRef.getField("KRB_NT_SRV_HST").getInt(null);
- principalName = classRef.getConstructor(String.class, int.class).
- newInstance(shortprinc, tKrbNtSrvHst);
- realmString = (String)classRef.getMethod("getRealmString", new Class[0]).
- invoke(principalName, new Object[0]);
- } catch (RuntimeException rte) {
- //silently catch everything
- } catch (Exception e) {
- //silently return default realm (which may itself be null)
- }
- if (null == realmString || realmString.equals("")) {
- return getDefaultRealmProtected();
- } else {
- return realmString;
- }
- }
-
- /* Return fqdn of the current host */
- static String getLocalHostName() throws UnknownHostException {
- return InetAddress.getLocalHost().getCanonicalHostName();
- }
-
- /**
- * Create Kerberos principal for a given service and hostname,
- * inferring realm from the fqdn of the hostname. It converts
- * hostname to lower case. If hostname is null or "0.0.0.0", it uses
- * dynamically looked-up fqdn of the current host instead.
- * If domain_realm mappings are inadequately specified, it will
- * use default_realm, per usual Kerberos behavior.
- * If default_realm also gives a null value, then a principal
- * without realm will be returned, which by Kerberos definitions is
- * just another way to specify default realm.
- *
- * @param service
- * Service for which you want to generate the principal.
- * @param hostname
- * Fully-qualified domain name.
- * @return Converted Kerberos principal name.
- * @throws UnknownHostException
- * If no IP address for the local host could be found.
- */
- public static final String getServicePrincipal(String service,
- String hostname)
- throws UnknownHostException {
- String fqdn = hostname;
- String shortprinc = null;
- String realmString = null;
- if (null == fqdn || fqdn.equals("") || fqdn.equals("0.0.0.0")) {
- fqdn = getLocalHostName();
- }
- // convert hostname to lowercase as kerberos does not work with hostnames
- // with uppercase characters.
- fqdn = fqdn.toLowerCase(Locale.US);
- shortprinc = service + "/" + fqdn;
- // Obtain the realm name inferred from the domain of the host
- realmString = getDomainRealm(shortprinc);
- if (null == realmString || realmString.equals("")) {
- return shortprinc;
- } else {
- return shortprinc + "@" + realmString;
- }
- }
-
- /**
- * Get all the unique principals present in the keytabfile.
- *
- * @param keytabFileName
- * Name of the keytab file to be read.
- * @return list of unique principals in the keytab.
- * @throws IOException
- * If keytab entries cannot be read from the file.
- */
- static final String[] getPrincipalNames(String keytabFileName) throws IOException {
- Keytab keytab = Keytab.loadKeytab(new File(keytabFileName));
- Set principals = new HashSet();
- List entries = keytab.getPrincipals();
- for (PrincipalName entry : entries) {
- principals.add(entry.getName().replace("\\", "/"));
- }
- return principals.toArray(new String[0]);
- }
-
- /**
- * Get all the unique principals from keytabfile which matches a pattern.
- *
- * @param keytab Name of the keytab file to be read.
- * @param pattern pattern to be matched.
- * @return list of unique principals which matches the pattern.
- * @throws IOException if cannot get the principal name
- */
- public static final String[] getPrincipalNames(String keytab,
- Pattern pattern) throws IOException {
- String[] principals = getPrincipalNames(keytab);
- if (principals.length != 0) {
- List matchingPrincipals = new ArrayList();
- for (String principal : principals) {
- if (pattern.matcher(principal).matches()) {
- matchingPrincipals.add(principal);
- }
- }
- principals = matchingPrincipals.toArray(new String[0]);
- }
- return principals;
- }
-
- /**
- * Check if the subject contains Kerberos keytab related objects.
- * The Kerberos keytab object attached in subject has been changed
- * from KerberosKey (JDK 7) to KeyTab (JDK 8)
- *
- *
- * @param subject subject to be checked
- * @return true if the subject contains Kerberos keytab
- */
- public static boolean hasKerberosKeyTab(Subject subject) {
- return !subject.getPrivateCredentials(KeyTab.class).isEmpty();
- }
-
- /**
- * Check if the subject contains Kerberos ticket.
- *
- *
- * @param subject subject to be checked
- * @return true if the subject contains Kerberos ticket
- */
- public static boolean hasKerberosTicket(Subject subject) {
- return !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
- }
-
- /**
- * Extract the TGS server principal from the given gssapi kerberos or spnego
- * wrapped token.
- * @param rawToken bytes of the gss token
- * @return String of server principal
- * @throws IllegalArgumentException if token is undecodable
- */
- public static String getTokenServerName(byte[] rawToken) {
- // subsequent comments include only relevant portions of the kerberos
- // DER encoding that will be extracted.
- DER token = new DER(rawToken);
- // InitialContextToken ::= [APPLICATION 0] IMPLICIT SEQUENCE {
- // mech OID
- // mech-token (NegotiationToken or InnerContextToken)
- // }
- DER oid = token.next();
- if (oid.equals(DER.SPNEGO_MECH_OID)) {
- // NegotiationToken ::= CHOICE {
- // neg-token-init[0] NegTokenInit
- // }
- // NegTokenInit ::= SEQUENCE {
- // mech-token[2] InitialContextToken
- // }
- token = token.next().get(0xa0, 0x30, 0xa2, 0x04).next();
- oid = token.next();
- }
- if (!oid.equals(DER.KRB5_MECH_OID)) {
- throw new IllegalArgumentException("Malformed gss token");
- }
- // InnerContextToken ::= {
- // token-id[1]
- // AP-REQ
- // }
- if (token.next().getTag() != 1) {
- throw new IllegalArgumentException("Not an AP-REQ token");
- }
- // AP-REQ ::= [APPLICATION 14] SEQUENCE {
- // ticket[3] Ticket
- // }
- DER ticket = token.next().get(0x6e, 0x30, 0xa3, 0x61, 0x30);
- // Ticket ::= [APPLICATION 1] SEQUENCE {
- // realm[1] String
- // sname[2] PrincipalName
- // }
- // PrincipalName ::= SEQUENCE {
- // name-string[1] SEQUENCE OF String
- // }
- String realm = ticket.get(0xa1, 0x1b).getAsString();
- DER names = ticket.get(0xa2, 0x30, 0xa1, 0x30);
- StringBuilder sb = new StringBuilder();
- while (names.hasNext()) {
- if (sb.length() > 0) {
- sb.append('/');
- }
- sb.append(names.next().getAsString());
- }
- return sb.append('@').append(realm).toString();
- }
-
- // basic ASN.1 DER decoder to traverse encoded byte arrays.
- private static class DER implements Iterator {
- static final DER SPNEGO_MECH_OID = getDER(GSS_SPNEGO_MECH_OID);
- static final DER KRB5_MECH_OID = getDER(GSS_KRB5_MECH_OID);
-
- private static DER getDER(Oid oid) {
- try {
- return new DER(oid.getDER());
- } catch (GSSException ex) {
- // won't happen. a proper OID is encodable.
- throw new IllegalArgumentException(ex);
- }
- }
-
- private final int tag;
- private final ByteBuffer bb;
-
- DER(byte[] buf) {
- this(ByteBuffer.wrap(buf));
- }
-
- DER(ByteBuffer srcbb) {
- tag = srcbb.get() & 0xff;
- int length = readLength(srcbb);
- bb = srcbb.slice();
- bb.limit(length);
- srcbb.position(srcbb.position() + length);
- }
-
- int getTag() {
- return tag;
- }
-
- // standard ASN.1 encoding.
- private static int readLength(ByteBuffer bb) {
- int length = bb.get();
- if ((length & (byte)0x80) != 0) {
- int varlength = length & 0x7f;
- length = 0;
- for (int i=0; i < varlength; i++) {
- length = (length << 8) | (bb.get() & 0xff);
- }
- }
- return length;
- }
-
- DER choose(int subtag) {
- while (hasNext()) {
- DER der = next();
- if (der.getTag() == subtag) {
- return der;
- }
- }
- return null;
- }
-
- DER get(int... tags) {
- DER der = this;
- for (int i=0; i < tags.length; i++) {
- int expectedTag = tags[i];
- // lookup for exact match, else scan if it's sequenced.
- if (der.getTag() != expectedTag) {
- der = der.hasNext() ? der.choose(expectedTag) : null;
- }
- if (der == null) {
- StringBuilder sb = new StringBuilder("Tag not found:");
- for (int ii=0; ii <= i; ii++) {
- sb.append(" 0x").append(Integer.toHexString(tags[ii]));
- }
- throw new IllegalStateException(sb.toString());
- }
- }
- return der;
- }
-
- String getAsString() {
- try {
- return new String(bb.array(), bb.arrayOffset() + bb.position(),
- bb.remaining(), "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new IllegalCharsetNameException("UTF-8"); // won't happen.
- }
- }
-
- @Override
- public int hashCode() {
- return 31 * tag + bb.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- return (o instanceof DER) &&
- tag == ((DER)o).tag && bb.equals(((DER)o).bb);
- }
-
- @Override
- public boolean hasNext() {
- // it's a sequence or an embedded octet.
- return ((tag & 0x30) != 0 || tag == 0x04) && bb.hasRemaining();
- }
-
- @Override
- public DER next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- return new DER(bb);
- }
-
- @Override
- public String toString() {
- return "[tag=0x"+Integer.toHexString(tag)+" bb="+bb+"]";
- }
- }
-}
diff --git a/src/main/java/org/apache/hadoop/util/LineReader.java b/src/main/java/org/apache/hadoop/util/LineReader.java
index 29f25f6..f34f61c 100644
--- a/src/main/java/org/apache/hadoop/util/LineReader.java
+++ b/src/main/java/org/apache/hadoop/util/LineReader.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,15 +18,20 @@
package org.apache.hadoop.util;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
import io.trino.hadoop.TextLineLengthLimitExceededException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.Text;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
/**
* A class that provides a line reader from an input stream.
@@ -41,414 +46,395 @@
*/
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
-public class LineReader
- implements Closeable
-{
- // Limitation for array size is VM specific. Current HotSpot VM limitation
- // for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5).
- // Integer.MAX_VALUE - 8 should be safe enough.
- private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
- private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private static final byte CR = '\r';
- private static final byte LF = '\n';
- // The line delimiter
- private final byte[] recordDelimiterBytes;
- private int bufferSize = DEFAULT_BUFFER_SIZE;
- private InputStream in;
- private byte[] buffer;
- // the number of bytes of real data in the buffer
- private int bufferLength = 0;
- // the current position in the buffer
- private int bufferPosn = 0;
+public class LineReader implements Closeable, IOStatisticsSource {
+ // Limitation for array size is VM specific. Current HotSpot VM limitation
+ // for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5).
+ // Integer.MAX_VALUE - 8 should be safe enough.
+ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private InputStream in;
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
- /**
- * Create a line reader that reads from the given stream using the
- * default buffer-size (64k).
- * @param in The input stream
- * @throws IOException
- */
- public LineReader(InputStream in)
- {
- this(in, DEFAULT_BUFFER_SIZE);
- }
+ private static final byte CR = '\r';
+ private static final byte LF = '\n';
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size.
- * @param in The input stream
- * @param bufferSize Size of the read buffer
- * @throws IOException
- */
- public LineReader(InputStream in, int bufferSize)
- {
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = null;
- }
+ // The line delimiter
+ private final byte[] recordDelimiterBytes;
- /**
- * Create a line reader that reads from the given stream using the
- * io.file.buffer.size specified in the given
- * Configuration.
- * @param in input stream
- * @param conf configuration
- * @throws IOException
- */
- public LineReader(InputStream in, Configuration conf)
- throws IOException
- {
- this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
- }
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size (64k).
+ * @param in The input stream
+ */
+ public LineReader(InputStream in) {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
- /**
- * Create a line reader that reads from the given stream using the
- * default buffer-size, and using a custom delimiter of array of
- * bytes.
- * @param in The input stream
- * @param recordDelimiterBytes The delimiter
- */
- public LineReader(InputStream in, byte[] recordDelimiterBytes)
- {
- this.in = in;
- this.bufferSize = DEFAULT_BUFFER_SIZE;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size.
+ * @param in The input stream
+ * @param bufferSize Size of the read buffer
+ */
+ public LineReader(InputStream in, int bufferSize) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = null;
+ }
- /**
- * Create a line reader that reads from the given stream using the
- * given buffer-size, and using a custom delimiter of array of
- * bytes.
- * @param in The input stream
- * @param bufferSize Size of the read buffer
- * @param recordDelimiterBytes The delimiter
- * @throws IOException
- */
- public LineReader(InputStream in, int bufferSize,
- byte[] recordDelimiterBytes)
- {
- this.in = in;
- this.bufferSize = bufferSize;
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
+ /**
+ * Create a line reader that reads from the given stream using the
+ * io.file.buffer.size specified in the given
+ * Configuration.
+ * @param in input stream
+ * @param conf configuration
+ * @throws IOException raised on errors performing I/O.
+ */
+ public LineReader(InputStream in, Configuration conf) throws IOException {
+ this(in, conf.getInt(IO_FILE_BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE));
+ }
- /**
- * Create a line reader that reads from the given stream using the
- * io.file.buffer.size specified in the given
- * Configuration, and using a custom delimiter of array of
- * bytes.
- * @param in input stream
- * @param conf configuration
- * @param recordDelimiterBytes The delimiter
- * @throws IOException
- */
- public LineReader(InputStream in, Configuration conf,
- byte[] recordDelimiterBytes)
- throws IOException
- {
- this.in = in;
- this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
- this.buffer = new byte[this.bufferSize];
- this.recordDelimiterBytes = recordDelimiterBytes;
- }
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size, and using a custom delimiter of array of
+ * bytes.
+ * @param in The input stream
+ * @param recordDelimiterBytes The delimiter
+ */
+ public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+ this.in = in;
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
- /**
- * Close the underlying stream.
- * @throws IOException
- */
- public void close()
- throws IOException
- {
- in.close();
- }
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size, and using a custom delimiter of array of
+ * bytes.
+ * @param in The input stream
+ * @param bufferSize Size of the read buffer
+ * @param recordDelimiterBytes The delimiter
+ */
+ public LineReader(InputStream in, int bufferSize,
+ byte[] recordDelimiterBytes) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * io.file.buffer.size specified in the given
+ * Configuration, and using a custom delimiter of array of
+ * bytes.
+ * @param in input stream
+ * @param conf configuration
+ * @param recordDelimiterBytes The delimiter
+ * @throws IOException raised on errors performing I/O.
+ */
+ public LineReader(InputStream in, Configuration conf,
+ byte[] recordDelimiterBytes) throws IOException {
+ this.in = in;
+ this.bufferSize = conf.getInt(IO_FILE_BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
- /**
- * Read one line from the InputStream into the given Text.
- *
- * @param str the object to store the given line (without newline)
- * @param maxLineLength the maximum number of bytes to store into str;
- * the rest of the line is silently discarded.
- * @param maxBytesToConsume the maximum number of bytes to consume
- * in this call. This is only a hint, because if the line cross
- * this threshold, we allow it to happen. It can overshoot
- * potentially by as much as one buffer length.
- *
- * @return the number of bytes read including the (longest) newline
- * found.
- *
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength,
- int maxBytesToConsume)
- throws IOException
- {
- maxLineLength = Math.min(maxLineLength, MAX_ARRAY_SIZE);
- maxBytesToConsume = Math.min(maxBytesToConsume, MAX_ARRAY_SIZE);
- if (this.recordDelimiterBytes != null) {
- return readCustomLine(str, maxLineLength, maxBytesToConsume);
- }
- else {
- return readDefaultLine(str, maxLineLength, maxBytesToConsume);
- }
- }
- protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
- throws IOException
- {
- return in.read(buffer);
+ /**
+ * Close the underlying stream.
+ * @throws IOException raised on errors performing I/O.
+ */
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /**
+ * Return any IOStatistics provided by the source.
+ * @return IO stats from the input stream.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(in);
+ }
+
+ /**
+ * Read one line from the InputStream into the given Text.
+ *
+ * @param str the object to store the given line (without newline)
+ * @param maxLineLength the maximum number of bytes to store into str;
+ * the rest of the line is silently discarded.
+ * @param maxBytesToConsume the maximum number of bytes to consume
+ * in this call. This is only a hint, because if the line cross
+ * this threshold, we allow it to happen. It can overshoot
+ * potentially by as much as one buffer length.
+ *
+ * @return the number of bytes read including the (longest) newline
+ * found.
+ *
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength,
+ int maxBytesToConsume) throws IOException {
+ maxLineLength = Math.min(maxLineLength, MAX_ARRAY_SIZE);
+ maxBytesToConsume = Math.min(maxBytesToConsume, MAX_ARRAY_SIZE);
+ if (this.recordDelimiterBytes != null) {
+ return readCustomLine(str, maxLineLength, maxBytesToConsume);
+ } else {
+ return readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
+ }
+
+ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+ throws IOException {
+ return in.read(buffer);
+ }
- /**
- * Read a line terminated by one of CR, LF, or CRLF.
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
*/
- private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException
- {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- str.clear();
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR) {
- ++bytesConsumed; //account for CR from previous read
- }
- bufferLength = fillBuffer(in, buffer, prevCharCR);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- }
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- if (appendLength > 0) {
- // We want to fail the read when the line length is over the limit.
- throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + maxLineLength);
- }
- }
- if (appendLength > 0) {
- int newTxtLength = txtLength + appendLength;
- if (str.getBytes().length < newTxtLength && Math.max(newTxtLength, txtLength << 1) > MAX_ARRAY_SIZE) {
- // If str need to be resized but the target capacity is over VM limit, it will trigger OOM.
- // In such case we will throw an IOException so the caller can deal with it.
- throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + newTxtLength);
- }
- str.append(buffer, startPosn, appendLength);
- txtLength = newTxtLength;
- }
+ str.clear();
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR) {
+ ++bytesConsumed; //account for CR from previous read
}
- while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
- if (newlineLength == 0 && bytesConsumed >= maxBytesToConsume) {
- // It is possible that bytesConsumed is over the maxBytesToConsume but we
- // didn't append anything to str.bytes. If we have consumed over maxBytesToConsume
- // bytes but still haven't seen a line terminator, we will fail the read.
- throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + bytesConsumed);
+ bufferLength = fillBuffer(in, buffer, prevCharCR);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
}
- return (int) bytesConsumed;
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ if (appendLength > 0) {
+ // We want to fail the read when the line length is over the limit.
+ throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + maxLineLength);
+ }
+ }
+ if (appendLength > 0) {
+ int newTxtLength = txtLength + appendLength;
+ if (str.getBytes().length < newTxtLength && Math.max(newTxtLength, txtLength << 1) > MAX_ARRAY_SIZE) {
+ // If str need to be resized but the target capacity is over VM limit, it will trigger OOM.
+ // In such case we will throw an IOException so the caller can deal with it.
+ throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + newTxtLength);
+ }
+ str.append(buffer, startPosn, appendLength);
+ txtLength = newTxtLength;
+ }
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (newlineLength == 0 && bytesConsumed >= maxBytesToConsume) {
+ // It is possible that bytesConsumed is over the maxBytesToConsume but we
+ // didn't append anything to str.bytes. If we have consumed over maxBytesToConsume
+ // bytes but still haven't seen a line terminator, we will fail the read.
+ throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + bytesConsumed);
}
+ return (int)bytesConsumed;
+ }
- /**
- * Read a line terminated by a custom delimiter.
- */
- private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
- throws IOException
- {
- /* We're reading data from inputStream, but the head of the stream may be
- * already captured in the previous buffer, so we have several cases:
- *
- * 1. The buffer tail does not contain any character sequence which
- * matches with the head of delimiter. We count it as a
- * ambiguous byte count = 0
- *
- * 2. The buffer tail contains a X number of characters,
- * that forms a sequence, which matches with the
- * head of delimiter. We count ambiguous byte count = X
- *
- * // *** eg: A segment of input file is as follows
- *
- * " record 1792: I found this bug very interesting and
- * I have completely read about it. record 1793: This bug
- * can be solved easily record 1794: This ."
- *
- * delimiter = "record";
- *
- * supposing:- String at the end of buffer =
- * "I found this bug very interesting and I have completely re"
- * There for next buffer = "ad about it. record 179 ...."
- *
- * The matching characters in the input
- * buffer tail and delimiter head = "re"
- * Therefore, ambiguous byte count = 2 **** //
- *
- * 2.1 If the following bytes are the remaining characters of
- * the delimiter, then we have to capture only up to the starting
- * position of delimiter. That means, we need not include the
- * ambiguous characters in str.
- *
- * 2.2 If the following bytes are not the remaining characters of
- * the delimiter ( as mentioned in the example ),
- * then we have to include the ambiguous characters in str.
- */
- str.clear();
- int txtLength = 0; // tracks str.getLength(), as an optimization
- long bytesConsumed = 0;
- int delPosn = 0;
- int ambiguousByteCount = 0; // To capture the ambiguous characters count
- do {
- int startPosn = bufferPosn; // Start from previous end position
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
- if (bufferLength <= 0) {
- if (ambiguousByteCount > 0) {
- str.append(recordDelimiterBytes, 0, ambiguousByteCount);
- bytesConsumed += ambiguousByteCount;
- }
- break; // EOF
- }
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) {
- if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
- delPosn++;
- if (delPosn >= recordDelimiterBytes.length) {
- bufferPosn++;
- break;
- }
- }
- else if (delPosn != 0) {
- bufferPosn -= delPosn;
- if (bufferPosn < -1) {
- bufferPosn = -1;
- }
- delPosn = 0;
- }
- }
- int readLength = bufferPosn - startPosn;
- bytesConsumed += readLength;
- int appendLength = readLength - delPosn;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- if (appendLength > 0) {
- // We want to fail the read when the line length is over the limit.
- throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + maxLineLength);
- }
- }
+ /**
+ * Read a line terminated by a custom delimiter.
+ */
+ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ /* We're reading data from inputStream, but the head of the stream may be
+ * already captured in the previous buffer, so we have several cases:
+ *
+ * 1. The buffer tail does not contain any character sequence which
+ * matches with the head of delimiter. We count it as a
+ * ambiguous byte count = 0
+ *
+ * 2. The buffer tail contains a X number of characters,
+ * that forms a sequence, which matches with the
+ * head of delimiter. We count ambiguous byte count = X
+ *
+ * // *** eg: A segment of input file is as follows
+ *
+ * " record 1792: I found this bug very interesting and
+ * I have completely read about it. record 1793: This bug
+ * can be solved easily record 1794: This ."
+ *
+ * delimiter = "record";
+ *
+ * supposing:- String at the end of buffer =
+ * "I found this bug very interesting and I have completely re"
+ * There for next buffer = "ad about it. record 179 ...."
+ *
+ * The matching characters in the input
+ * buffer tail and delimiter head = "re"
+ * Therefore, ambiguous byte count = 2 **** //
+ *
+ * 2.1 If the following bytes are the remaining characters of
+ * the delimiter, then we have to capture only up to the starting
+ * position of delimiter. That means, we need not include the
+ * ambiguous characters in str.
+ *
+ * 2.2 If the following bytes are not the remaining characters of
+ * the delimiter ( as mentioned in the example ),
+ * then we have to include the ambiguous characters in str.
+ */
+ str.clear();
+ int txtLength = 0; // tracks str.getLength(), as an optimization
+ long bytesConsumed = 0;
+ int delPosn = 0;
+ int ambiguousByteCount=0; // To capture the ambiguous characters count
+ do {
+ int startPosn = bufferPosn; // Start from previous end position
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
+ if (bufferLength <= 0) {
+ if (ambiguousByteCount > 0) {
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
bytesConsumed += ambiguousByteCount;
- if (appendLength >= 0 && ambiguousByteCount > 0) {
- //appending the ambiguous characters (refer case 2.2)
- str.append(recordDelimiterBytes, 0, ambiguousByteCount);
- ambiguousByteCount = 0;
- // since it is now certain that the split did not split a delimiter we
- // should not read the next record: clear the flag otherwise duplicate
- // records could be generated
- unsetNeedAdditionalRecordAfterSplit();
- }
- if (appendLength > 0) {
- int newTxtLength = txtLength + appendLength;
- if (str.getBytes().length < newTxtLength && Math.max(newTxtLength, txtLength << 1) > MAX_ARRAY_SIZE) {
- // If str need to be resized but the target capacity is over VM limit, it will trigger OOM.
- // In such case we will throw an IOException so the caller can deal with it.
- throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + newTxtLength);
- }
- str.append(buffer, startPosn, appendLength);
- txtLength = newTxtLength;
- }
- if (bufferPosn >= bufferLength) {
- if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
- ambiguousByteCount = delPosn;
- bytesConsumed -= ambiguousByteCount; //to be consumed in next
- }
- }
+ }
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) {
+ if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+ delPosn++;
+ if (delPosn >= recordDelimiterBytes.length) {
+ bufferPosn++;
+ break;
+ }
+ } else if (delPosn != 0) {
+ bufferPosn -= delPosn;
+ if(bufferPosn < -1) {
+ bufferPosn = -1;
+ }
+ delPosn = 0;
+ }
+ }
+ int readLength = bufferPosn - startPosn;
+ bytesConsumed += readLength;
+ int appendLength = readLength - delPosn;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ if (appendLength > 0) {
+ // We want to fail the read when the line length is over the limit.
+ throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + maxLineLength);
}
- while (delPosn < recordDelimiterBytes.length
- && bytesConsumed < maxBytesToConsume);
- if (delPosn < recordDelimiterBytes.length
- && bytesConsumed >= maxBytesToConsume) {
- // It is possible that bytesConsumed is over the maxBytesToConsume but we
- // didn't append anything to str.bytes. If we have consumed over maxBytesToConsume
- // bytes but still haven't seen a line terminator, we will fail the read.
- throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + bytesConsumed);
+ }
+ bytesConsumed += ambiguousByteCount;
+ if (appendLength >= 0 && ambiguousByteCount > 0) {
+ //appending the ambiguous characters (refer case 2.2)
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ ambiguousByteCount = 0;
+ // since it is now certain that the split did not split a delimiter we
+ // should not read the next record: clear the flag otherwise duplicate
+ // records could be generated
+ unsetNeedAdditionalRecordAfterSplit();
+ }
+ if (appendLength > 0) {
+ int newTxtLength = txtLength + appendLength;
+ if (str.getBytes().length < newTxtLength && Math.max(newTxtLength, txtLength << 1) > MAX_ARRAY_SIZE) {
+ // If str need to be resized but the target capacity is over VM limit, it will trigger OOM.
+ // In such case we will throw an IOException so the caller can deal with it.
+ throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + newTxtLength);
}
- return (int) bytesConsumed;
+ str.append(buffer, startPosn, appendLength);
+ txtLength = newTxtLength;
+ }
+ if (bufferPosn >= bufferLength) {
+ if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
+ ambiguousByteCount = delPosn;
+ bytesConsumed -= ambiguousByteCount; //to be consumed in next
+ }
+ }
+ } while (delPosn < recordDelimiterBytes.length
+ && bytesConsumed < maxBytesToConsume);
+ if (delPosn < recordDelimiterBytes.length
+ && bytesConsumed >= maxBytesToConsume) {
+ // It is possible that bytesConsumed is over the maxBytesToConsume but we
+ // didn't append anything to str.bytes. If we have consumed over maxBytesToConsume
+ // bytes but still haven't seen a line terminator, we will fail the read.
+ throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + bytesConsumed);
}
+ return (int) bytesConsumed;
+ }
- /**
- * Read from the InputStream into the given Text.
- * @param str the object to store the given line
- * @param maxLineLength the maximum number of bytes to store into str.
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength)
- throws IOException
- {
- return readLine(str, maxLineLength, Integer.MAX_VALUE);
- }
+ /**
+ * Read from the InputStream into the given Text.
+ * @param str the object to store the given line
+ * @param maxLineLength the maximum number of bytes to store into str.
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+ }
- /**
- * Read from the InputStream into the given Text.
- * @param str the object to store the given line
- * @return the number of bytes read including the newline
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str)
- throws IOException
- {
- return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
- }
+ /**
+ * Read from the InputStream into the given Text.
+ * @param str the object to store the given line
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
- protected int getBufferPosn()
- {
- return bufferPosn;
- }
+ protected int getBufferPosn() {
+ return bufferPosn;
+ }
- protected int getBufferSize()
- {
- return bufferSize;
- }
+ protected int getBufferSize() {
+ return bufferSize;
+ }
- protected void unsetNeedAdditionalRecordAfterSplit()
- {
- // needed for custom multi byte line delimiters only
- // see MAPREDUCE-6549 for details
- }
+ protected void unsetNeedAdditionalRecordAfterSplit() {
+ // needed for custom multi byte line delimiters only
+ // see MAPREDUCE-6549 for details
+ }
}
diff --git a/src/main/java/org/wildfly/openssl/OpenSSLProvider.java b/src/main/java/org/wildfly/openssl/OpenSSLProvider.java
index e1e6403..569560e 100644
--- a/src/main/java/org/wildfly/openssl/OpenSSLProvider.java
+++ b/src/main/java/org/wildfly/openssl/OpenSSLProvider.java
@@ -13,23 +13,9 @@
*/
package org.wildfly.openssl;
-import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
-import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx.SSLChannelMode;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-
public final class OpenSSLProvider
{
private OpenSSLProvider() {}
- public static void register()
- {
- try {
- SSLSocketFactoryEx.initializeDefaultFactory(SSLChannelMode.Default_JSSE);
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
+ public static void register() {}
}
diff --git a/src/main/resources/nativelib/Linux-aarch64/libhadoop.so b/src/main/resources/nativelib/Linux-aarch64/libhadoop.so
index eac5006..42958bd 100755
Binary files a/src/main/resources/nativelib/Linux-aarch64/libhadoop.so and b/src/main/resources/nativelib/Linux-aarch64/libhadoop.so differ
diff --git a/src/main/resources/nativelib/Linux-aarch64/libsnappy.so b/src/main/resources/nativelib/Linux-aarch64/libsnappy.so
deleted file mode 100755
index f150792..0000000
Binary files a/src/main/resources/nativelib/Linux-aarch64/libsnappy.so and /dev/null differ
diff --git a/src/main/resources/nativelib/Linux-amd64/libhadoop.so b/src/main/resources/nativelib/Linux-amd64/libhadoop.so
index c81f405..0c86dd5 100755
Binary files a/src/main/resources/nativelib/Linux-amd64/libhadoop.so and b/src/main/resources/nativelib/Linux-amd64/libhadoop.so differ
diff --git a/src/main/resources/nativelib/Linux-amd64/libsnappy.so b/src/main/resources/nativelib/Linux-amd64/libsnappy.so
deleted file mode 100755
index 2aa7cad..0000000
Binary files a/src/main/resources/nativelib/Linux-amd64/libsnappy.so and /dev/null differ
diff --git a/src/main/resources/nativelib/Linux-ppc64le/libsnappy.so b/src/main/resources/nativelib/Linux-ppc64le/libsnappy.so
deleted file mode 100755
index 77b7cd5..0000000
Binary files a/src/main/resources/nativelib/Linux-ppc64le/libsnappy.so and /dev/null differ
diff --git a/src/main/resources/nativelib/Mac_OS_X-aarch64/libsnappy.dylib b/src/main/resources/nativelib/Mac_OS_X-aarch64/libsnappy.dylib
deleted file mode 100644
index 36fe5c2..0000000
Binary files a/src/main/resources/nativelib/Mac_OS_X-aarch64/libsnappy.dylib and /dev/null differ
diff --git a/src/main/resources/nativelib/Mac_OS_X-x86_64/libsnappy.dylib b/src/main/resources/nativelib/Mac_OS_X-x86_64/libsnappy.dylib
deleted file mode 100644
index 7cdc9ac..0000000
Binary files a/src/main/resources/nativelib/Mac_OS_X-x86_64/libsnappy.dylib and /dev/null differ
diff --git a/src/test/java/io/trino/hadoop/TestHadoopNative.java b/src/test/java/io/trino/hadoop/TestHadoopNative.java
index bc062be..c94f53d 100644
--- a/src/test/java/io/trino/hadoop/TestHadoopNative.java
+++ b/src/test/java/io/trino/hadoop/TestHadoopNative.java
@@ -41,7 +41,6 @@ public void testNative()
HadoopNative.requireHadoopNative();
assertTrue(NativeCodeLoader.isNativeCodeLoaded());
- assertTrue(NativeCodeLoader.buildSupportsSnappy());
assertTrue(NativeCodeLoader.buildSupportsZstd());
assertTrue(ZlibFactory.isNativeZlibLoaded(new Configuration()));
assertTrue(Bzip2Factory.isNativeBzip2Loaded(new Configuration()));