Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,8 @@ private[spark] object Utils extends Logging {
case _ =>
val fs = getHadoopFileSystem(uri, hadoopConf)
val path = new Path(uri)
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
filename = Some(filename))
}
}

Expand All @@ -639,19 +640,22 @@ private[spark] object Utils extends Logging {
fs: FileSystem,
conf: SparkConf,
hadoopConf: Configuration,
fileOverwrite: Boolean): Unit = {
if (!targetDir.mkdir()) {
fileOverwrite: Boolean,
filename: Option[String] = None): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment that says this is only expected to be set if the path describes a file, not a directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doFetchFile actually passes filename regardless of whether it's a file or a directory, and the code here treats both equally as far as filename is concerned, so I think that comment would be misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right... though that pretty unintuitive. We should clean this signature up separately after the release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The renaming is used for the "share files among executors" feature, which creates files with a different name before moving them around. I'm not sure if that applies to directories too.

if (!targetDir.exists() && !targetDir.mkdir()) {
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
fs.listStatus(path).foreach { fileStatus =>
val innerPath = fileStatus.getPath
if (fileStatus.isDir) {
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
fileOverwrite)
} else {
val in = fs.open(innerPath)
val targetFile = new File(targetDir, innerPath.getName)
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
val dest = new File(targetDir, filename.getOrElse(path.getName))
if (fs.isFile(path)) {
val in = fs.open(path)
try {
downloadFile(path.toString, in, dest, fileOverwrite)
} finally {
in.close()
}
} else {
fs.listStatus(path).foreach { fileStatus =>
fetchHcfsFile(fileStatus.getPath(), dest, fs, conf, hadoopConf, fileOverwrite)
}
}
}
Expand Down
85 changes: 47 additions & 38 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
child1.setLastModified(System.currentTimeMillis() - (1000 * 30))

// although child1 is old, child2 is still new so return true
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child2.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

parent.setLastModified(System.currentTimeMillis - (1000 * 30))
// although parent and its immediate children are new, child3 is still old
// we expect a full recursive search for new files.
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))

child3.setLastModified(System.currentTimeMillis - (1000 * 30))
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
}

test("resolveURI") {
Expand Down Expand Up @@ -339,21 +339,21 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(!tempDir1.exists())

val tempDir2 = Utils.createTempDir()
val tempFile1 = new File(tempDir2, "foo.txt")
Files.touch(tempFile1)
assert(tempFile1.exists())
Utils.deleteRecursively(tempFile1)
assert(!tempFile1.exists())
val sourceFile1 = new File(tempDir2, "foo.txt")
Files.touch(sourceFile1)
assert(sourceFile1.exists())
Utils.deleteRecursively(sourceFile1)
assert(!sourceFile1.exists())

val tempDir3 = new File(tempDir2, "subdir")
assert(tempDir3.mkdir())
val tempFile2 = new File(tempDir3, "bar.txt")
Files.touch(tempFile2)
assert(tempFile2.exists())
val sourceFile2 = new File(tempDir3, "bar.txt")
Files.touch(sourceFile2)
assert(sourceFile2.exists())
Utils.deleteRecursively(tempDir2)
assert(!tempDir2.exists())
assert(!tempDir3.exists())
assert(!tempFile2.exists())
assert(!sourceFile2.exists())
}

test("loading properties from file") {
Expand Down Expand Up @@ -386,30 +386,39 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
}

test("fetch hcfs dir") {
val tempDir = Utils.createTempDir()
val innerTempDir = Utils.createTempDir(tempDir.getPath)
val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
val targetDir = new File("target-dir")
Files.write("some text", tempFile, UTF_8)

try {
val path = new Path("file://" + tempDir.getAbsolutePath)
val conf = new Configuration()
val fs = Utils.getHadoopFileSystem(path.toString, conf)
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
assert(targetDir.exists())
assert(targetDir.isDirectory())
val newInnerDir = new File(targetDir, innerTempDir.getName)
println("inner temp dir: " + innerTempDir.getName)
targetDir.listFiles().map(_.getName).foreach(println)
assert(newInnerDir.exists())
assert(newInnerDir.isDirectory())
val newInnerFile = new File(newInnerDir, tempFile.getName)
assert(newInnerFile.exists())
assert(newInnerFile.isFile())
} finally {
Utils.deleteRecursively(tempDir)
Utils.deleteRecursively(targetDir)
}
val sourceDir = Utils.createTempDir()
val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath)
val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir)
val targetDir = new File(Utils.createTempDir(), "target-dir")
Files.write("some text", sourceFile, UTF_8)

val path = new Path("file://" + sourceDir.getAbsolutePath)
val conf = new Configuration()
val fs = Utils.getHadoopFileSystem(path.toString, conf)

assert(!targetDir.isDirectory())
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
assert(targetDir.isDirectory())

// Copy again to make sure it doesn't error if the dir already exists.
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)

val destDir = new File(targetDir, sourceDir.getName())
assert(destDir.isDirectory())

val destInnerDir = new File(destDir, innerSourceDir.getName)
assert(destInnerDir.isDirectory())

val destInnerFile = new File(destInnerDir, sourceFile.getName)
assert(destInnerFile.isFile())

val filePath = new Path("file://" + sourceFile.getAbsolutePath)
val testFileDir = new File("test-filename")
val testFileName = "testFName"
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
conf, false, Some(testFileName))
val newFileName = new File(testFileDir, testFileName)
assert(newFileName.isFile())
}
}