Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,37 @@ import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID.randomUUID

import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}

import akka.actor.Props

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this line

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this line

import org.apache.mesos.MesosNativeLibrary
import akka.actor.Props

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
Expand Down Expand Up @@ -996,12 +1004,49 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*/
def addFile(path: String) {
def addFile(path: String): Unit = {
addFile(path, false)
}

/**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*
* A directory can be given if the recursive option is set to true. Currently directories are only
* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val isLocalMode = conf.get("spark.master").startsWith("local")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we already have a SparkContext.isLocal variable for this, so I'd use that instead of inspecting SparkConf here.

val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
case "local" => "file:" + uri.getPath
case _ => path
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => "file:" + uri.getPath
case _ => path
}

val hadoopPath = new Path(schemeCorrectedPath)
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
if (!fs.exists(hadoopPath)) {
throw new SparkException(s"Added file $hadoopPath does not exist.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could throw FileNotFoundException if you think it's helpful to throw more specific exception classes.

}
val isDir = fs.isDirectory(hadoopPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we use fs.isDir() in other places because of hadoop 1.x.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we have to use isDir here due to Hadoop 1.x; we've run into this same issue before (such as in #3298) and it breaks the Hadoop 1 Maven builds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the uses of FileStatus#isDirectory, but FileSystem#isDirectory exists in all versions.

if (!isLocalMode && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
}
if (!recursive && isDir) {
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
"turned on.")
}
}

val key = if (!isLocalMode && scheme == "file") {
env.httpFileServer.addFile(new File(uri.getPath))
} else {
schemeCorrectedPath
}
val timestamp = System.currentTimeMillis
addedFiles(key) = timestamp
Expand Down Expand Up @@ -1549,8 +1594,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails =
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
Expand Down
86 changes: 71 additions & 15 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,10 @@ private[spark] object Utils extends Logging {
}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
* across executors running the same application. `useCache` is used mainly for
Expand Down Expand Up @@ -429,17 +431,18 @@ private[spark] object Utils extends Logging {
*
* @param url URL that `sourceFile` originated from, for logging purposes.
* @param in InputStream to download.
* @param tempFile File path to download `in` to.
* @param destFile File path to move `tempFile` to.
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
* `sourceFile`
*/
private def downloadFile(
url: String,
in: InputStream,
tempFile: File,
destFile: File,
fileOverwrite: Boolean): Unit = {
val tempFile = File.createTempFile("fetchFileTemp", null,
new File(destFile.getParentFile.getAbsolutePath))
logInfo("Fetching " + url + " to " + tempFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use string interpolation here.


try {
val out = new FileOutputStream(tempFile)
Expand Down Expand Up @@ -478,7 +481,7 @@ private[spark] object Utils extends Logging {
removeSourceFile: Boolean = false): Unit = {

if (destFile.exists) {
if (!Files.equal(sourceFile, destFile)) {
if (!filesEqualRecursive(sourceFile, destFile)) {
if (fileOverwrite) {
logInfo(
s"File $destFile exists and does not match contents of $url, replacing it with $url"
Expand Down Expand Up @@ -513,13 +516,44 @@ private[spark] object Utils extends Logging {
Files.move(sourceFile, destFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear from the docs, but does this work for directories? Could you add a unit test? (It doesn't seem like this is covered for directories by the current ones, after a quick look.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested this in isolation and it does work for directories. A new test that I added should exercise this path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm...

scala> val from = new File("/tmp/foo")
from: java.io.File = /tmp/foo

scala> val to = new File("/home/vanzin/tmp/foo")
to: java.io.File = /home/vanzin/tmp/foo

scala> Files.move(from, to)
java.io.FileNotFoundException: /tmp/foo (Is a directory)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:120)
        at com.google.common.io.Files$FileByteSource.openStream(Files.java:124)
        at com.google.common.io.Files$FileByteSource.openStream(Files.java:114)
        at com.google.common.io.ByteSource.copyTo(ByteSource.java:202)
        at com.google.common.io.Files.copy(Files.java:436)
        at com.google.common.io.Files.move(Files.java:651)

I think it doesn't work if from and to are on different file systems (such as in my case above).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this offline; it seems that, in Spark's case, both source and destination are on the same filesystem, so this should work.

Guava does something akin to if (!from.renameTo(to)) copy(from, to) internally; that works on Unix for directories if both source and destination are on the same filesystem. On Windows, though, it would fail: renameTo fails on Windows if the destination already exists, and the Guava code would revert back to copy() which only works on files. (That also means move is unnecessarily expensive on Windows then the destination exists, but that's a Guava issue.)

So if we care about Windows here we should probably do a deleteRecursively before trying the rename. It ceases being an "atomic" operation, but Java doesn't expose the Window's version of rename, which follows the Unix semantics.

} else {
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
Files.copy(sourceFile, destFile)
copyRecursive(sourceFile, destFile)
}
}

private def filesEqualRecursive(file1: File, file2: File): Boolean = {
if (file1.isDirectory && file2.isDirectory) {
val subfiles1 = file1.listFiles()
val subfiles2 = file2.listFiles()
if (subfiles1.size != subfiles2.size) {
return false
}
subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).dropWhile {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of dropWhile { ... }.isEmpty, I think you can do forAll:

scala> Seq(1, 2, 3).forall { _ >= 3 }
res1: Boolean = false

scala> Seq(1, 2, 3).forall { _ >= 0 }
res2: Boolean = true

case (f1, f2) => filesEqualRecursive(f1, f2)
}.isEmpty
} else if (file1.isFile && file2.isFile) {
Files.equal(file1, file2)
} else {
false
}
}

private def copyRecursive(source: File, dest: File): Unit = {
if (source.isDirectory) {
if (!dest.mkdir()) {
throw new IOException(s"Failed to create directory ${dest.getPath}")
}
val subfiles = source.listFiles()
subfiles.foreach(f => copyRecursive(f, new File(dest, f.getName)))
} else {
Files.copy(source, dest)
}
}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
* filesystems.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
Expand All @@ -531,14 +565,11 @@ private[spark] object Utils extends Logging {
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration) {
val tempFile = File.createTempFile("fetchFileTemp", null, new File(targetDir.getAbsolutePath))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
Option(uri.getScheme).getOrElse("file") match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + tempFile)

var uc: URLConnection = null
if (securityMgr.isAuthenticationEnabled()) {
logDebug("fetchFile with security enabled")
Expand All @@ -555,17 +586,42 @@ private[spark] object Utils extends Logging {
uc.setReadTimeout(timeout)
uc.connect()
val in = uc.getInputStream()
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
downloadFile(url, in, targetFile, fileOverwrite)
case "file" =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
copyFile(url, sourceFile, targetFile, fileOverwrite)
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
val path = new Path(uri)
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
}
}

/**
* Fetch a file or directory from a Hadoop-compatible filesystem.
*
* Visible for testing
*/
private[spark] def fetchHcfsFile(
path: Path,
targetDir: File,
fs: FileSystem,
conf: SparkConf,
hadoopConf: Configuration,
fileOverwrite: Boolean): Unit = {
targetDir.mkdir()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the return value here so that we get a more informative message if an error occurs?

fs.listStatus(path).foreach { fileStatus =>
val innerPath = fileStatus.getPath
if (fileStatus.isDir) {
fetchHcfsFile(innerPath, new File(targetDir, innerPath.getName), fs, conf, hadoopConf,
fileOverwrite)
} else {
val in = fs.open(innerPath)
val targetFile = new File(targetDir, innerPath.getName)
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
}
}
}

Expand Down
86 changes: 86 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark

import java.io.File

import com.google.common.base.Charsets._
import com.google.common.io.Files

import org.scalatest.FunSuite

import org.apache.hadoop.io.BytesWritable
Expand Down Expand Up @@ -72,4 +77,85 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}

test("addFile works") {
val file = new File("somefile")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use one of the createTempFile methods to create this file, rather than using the CWD.

val absolutePath = file.getAbsolutePath
try {
Files.write("somewords", file, UTF_8)
val length = file.length()
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(file.getAbsolutePath)
sc.parallelize(Array(1), 1).map(x => {
val gotten = new File(SparkFiles.get(file.getName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess using local doesn't incur serialization? Because this would fail to serialize file in a real cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK you can serialize File instances:

scala> import java.io.File
import java.io.File

scala> import org.apache.commons.lang3.SerializationUtils
import org.apache.commons.lang3.SerializationUtils

scala> SerializationUtils.serialize(new File("/usr/share/dict/words"))
res1: Array[Byte] = Array(-84, -19, 0, 5, 115, 114, 0, 12, 106, 97, 118, 97, 46, 105, 111, 46, 70, 105, 108, 101, 4, 45, -92, 69, 14, 13, -28, -1, 3, 0, 1, 76, 0, 4, 112, 97, 116, 104, 116, 0, 18, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 59, 120, 112, 116, 0, 21, 47, 117, 115, 114, 47, 115, 104, 97, 114, 101, 47, 100, 105, 99, 116, 47, 119, 111, 114, 100, 115, 119, 2, 0, 47, 120)

if (!gotten.exists()) {
throw new SparkException("file doesn't exist")
}
if (length != gotten.length()) {
throw new SparkException(
s"file has different length $length than added file ${gotten.length()}")
}
if (absolutePath == gotten.getAbsolutePath) {
throw new SparkException("file should have been copied")
}
x
}).count()
} finally {
sc.stop()
file.delete()
}
}

test("addFile recursive works") {
val pluto = new File("pluto")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, I suppose you could create these under a temporary directory and use that to manage the cleanup. That might be better since it would handle some corner-cases with deleting non-empty directories.

val neptune = new File(pluto, "neptune")
val saturn = new File(neptune, "saturn")
val alien1 = new File(neptune, "alien1")
val alien2 = new File(saturn, "alien2")

try {
assert(neptune.mkdirs())
assert(saturn.mkdir())
assert(alien1.createNewFile())
assert(alien2.createNewFile())

sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(neptune.getAbsolutePath, true)
sc.parallelize(Array(1), 1).map(x => {
val sep = File.separator
if (!new File(SparkFiles.get("neptune" + sep + "alien1")).exists()) {
throw new SparkException("can't access file under root added directory")
}
if (!new File(SparkFiles.get("neptune" + sep + "saturn" + sep + "alien2")).exists()) {
throw new SparkException("can't access file in nested directory")
}
if (new File(SparkFiles.get("pluto" + sep + "neptune" + sep + "alien1")).exists()) {
throw new SparkException("file exists that shouldn't")
}
x
}).count()
} finally {
sc.stop()
alien2.delete()
saturn.delete()
alien1.delete()
neptune.delete()
pluto.delete()
}
}

test("addFile recursive can't add directories by default") {
val dir = new File("dir")

try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addFile(dir.getAbsolutePath)
assert(false, "should have thrown exception")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScalaTest's intercept is the idiomatic way to test that expected exceptions are thrown.

} catch {
case _: SparkException =>
} finally {
sc.stop()
dir.delete()
}
}
}
Loading