Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2281,10 +2281,22 @@ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
return createRemoteIterator(f, recursive, false);
}

public RemoteIterator<LocatedFileStatus> listFilesAndDirs(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
return createRemoteIterator(f, recursive, true);
}

private RemoteIterator<LocatedFileStatus> createRemoteIterator(
final Path f, final boolean recursive, boolean includeDir
) throws FileNotFoundException, IOException {
return new RemoteIterator<LocatedFileStatus>() {
private Stack<RemoteIterator<LocatedFileStatus>> itors = new Stack<>();
private RemoteIterator<LocatedFileStatus> curItor =
listLocatedStatus(f);
listLocatedStatus(f);
private LocatedFileStatus curFile;

@Override
Expand Down Expand Up @@ -2313,6 +2325,10 @@ private void handleFileStat(LocatedFileStatus stat) throws IOException {
if (stat.isFile()) { // file
curFile = stat;
} else if (recursive) { // directory
if (includeDir) {
curFile = stat;
}

itors.push(curItor);
curItor = listLocatedStatus(stat.getPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@
import com.amazonaws.services.s3.transfer.model.UploadResult;
import com.amazonaws.event.ProgressListener;

import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -3783,70 +3785,75 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
// innerCopyFromLocalFile(delSrc, overwrite, src, dst);
super.copyFromLocalFile(delSrc, overwrite, src, dst);
return null;
});
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
() -> new CopyFromLocalOperation(
createStoreContext(),
src,
dst,
delSrc,
overwrite,
createCopyFromLocalCallbacks()).execute());
}

protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
createCopyFromLocalCallbacks() throws IOException {
LocalFileSystem local = getLocal(getConf());
return new CopyFromLocalCallbacksImpl(local);
}

/**
* The src file is on the local disk. Add it to FS at
* the given dst name.
*
* This version doesn't need to create a temporary file to calculate the md5.
* Sadly this doesn't seem to be used by the shell cp :(
*
* <i>HADOOP-15932:</i> this method has been unwired from
* {@link #copyFromLocalFile(boolean, boolean, Path, Path)} until
* it is extended to list and copy whole directories.
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src Source path: must be on local filesystem
* @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false, or if the destination is a directory.
* @throws FileNotFoundException if the source file does not exit
* @throws AmazonClientException failure in the AWS SDK
* @throws IllegalArgumentException if the source path is not on the local FS
*/
@Retries.RetryTranslated
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException, FileAlreadyExistsException, AmazonClientException {
LOG.debug("Copying local file from {} to {}", src, dst);
protected class CopyFromLocalCallbacksImpl implements
CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
private final LocalFileSystem local;

// Since we have a local file, we don't need to stream into a temporary file
LocalFileSystem local = getLocal(getConf());
File srcfile = local.pathToFile(src);
if (!srcfile.exists()) {
throw new FileNotFoundException("No file: " + src);
private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
this.local = local;
}
if (!srcfile.isFile()) {
throw new FileNotFoundException("Not a file: " + src);

@Override
public RemoteIterator<LocatedFileStatus> listStatusIterator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer listLocalStatusIterator()

final Path path,
final boolean recursive) throws IOException {
return local.listFilesAndDirs(path, true);
}

try {
FileStatus status = innerGetFileStatus(dst, false, StatusProbeEnum.ALL);
if (!status.isFile()) {
throw new FileAlreadyExistsException(dst + " exists and is not a file");
}
if (!overwrite) {
throw new FileAlreadyExistsException(dst + " already exists");
}
} catch (FileNotFoundException e) {
// no destination, all is well
}
final String key = pathToKey(dst);
final ObjectMetadata om = newObjectMetadata(srcfile.length());
Progressable progress = null;
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
invoker.retry("copyFromLocalFile(" + src + ")", dst.toString(), true,
() -> executePut(putObjectRequest, progress));
if (delSrc) {
local.delete(src, false);
@Override
public File pathToFile(Path path) {
return local.pathToFile(path);
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
return local.delete(path, recursive);
}

@Override
public void copyFileFromTo(File file, Path from, Path to) throws IOException {
trackDurationAndSpan(
OBJECT_PUT_REQUESTS,
to,
() -> {

final String key = pathToKey(to);
final ObjectMetadata om = newObjectMetadata(file.length());
Progressable progress = null;
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file);
S3AFileSystem.this.invoker.retry(
"putObject(" + "" + ")", to.toString(),
true,
() -> executePut(putObjectRequest, progress));

return null;
});
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
return S3AFileSystem.this.getFileStatus(f);
}

@Override
public boolean createEmptyDir(Path path) throws IOException {
return S3AFileSystem.this.mkdirs(path);
}
}

Expand Down
Loading