diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e7eabd289699c..fa0bad50d7007 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -792,12 +792,22 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def textFile( path: String, - minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { + minPartitions: Int, + conf: Configuration): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minPartitions).map(pair => pair._2.toString).setName(path) + minPartitions, conf).map(pair => pair._2.toString).setName(path) } + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ + def textFile( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[String] = + textFile(path, minPartitions, hadoopConfiguration) + /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a @@ -831,9 +841,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def wholeTextFiles( path: String, - minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope { + minPartitions: Int, + conf: Configuration): RDD[(String, String)] = withScope { assertNotStopped() - val job = NewHadoopJob.getInstance(hadoopConfiguration) + val job = NewHadoopJob.getInstance(conf) // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) NewFileInputFormat.setInputPaths(job, path) @@ -848,6 +859,40 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** + * Read a directory of text files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + *

For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`, + * + *

then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def wholeTextFiles( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = + wholeTextFiles(path, minPartitions, hadoopConfiguration) + + /** + * :: Experimental :: + * * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file * (useful for binary data) * @@ -880,9 +925,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def binaryFiles( path: String, - minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope { + minPartitions: Int, + conf: Configuration): RDD[(String, PortableDataStream)] = withScope { assertNotStopped() - val job = NewHadoopJob.getInstance(hadoopConfiguration) + val job = NewHadoopJob.getInstance(conf) // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) NewFileInputFormat.setInputPaths(job, path) @@ -897,6 +943,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** + * :: Experimental :: + * + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file + * (useful for binary data) + * + * For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do + * `val rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, + * + * then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred; very large files may cause bad performance. + */ + def binaryFiles( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = + binaryFiles(path, minPartitions, hadoopConfiguration) + + /** + * :: Experimental :: + * * Load data from a flat binary file, assuming the length of each record is constant. * * '''Note:''' We ensure that the byte array for each record in the resulting RDD @@ -973,10 +1055,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { + minPartitions: Int, + conf: Configuration): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) + val confBroadcast = broadcast(new SerializableConfiguration(conf)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, @@ -988,6 +1071,22 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli minPartitions).setName(path) } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ + def hadoopFile[K, V]( + path: String, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = + hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, hadoopConfiguration) + /** * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys, * values and the InputFormat so that users don't need to pass them directly. Instead, callers @@ -1108,14 +1207,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - def sequenceFile[K, V](path: String, + def sequenceFile[K, V]( + path: String, keyClass: Class[K], valueClass: Class[V], - minPartitions: Int + minPartitions: Int, + conf: Configuration ): RDD[(K, V)] = withScope { assertNotStopped() val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] - hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) + hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, conf) } /** @@ -1130,10 +1231,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def sequenceFile[K, V]( path: String, keyClass: Class[K], - valueClass: Class[V]): RDD[(K, V)] = withScope { - assertNotStopped() - sequenceFile(path, keyClass, valueClass, defaultMinPartitions) - } + valueClass: Class[V], + minPartitions: Int + ): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, minPartitions, hadoopConfiguration) + + /** + * Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ + def sequenceFile[K, V]( + path: String, + keyClass: Class[K], + valueClass: Class[V] + ): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, defaultMinPartitions, hadoopConfiguration) /** * Version of sequenceFile() for types implicitly convertible to Writables through a @@ -1157,10 +1274,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - def sequenceFile[K, V] - (path: String, minPartitions: Int = defaultMinPartitions) - (implicit km: ClassTag[K], vm: ClassTag[V], - kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { + def sequenceFile[K, V]( + path: String, + minPartitions: Int, + conf: Configuration)(implicit km: ClassTag[K], vm: ClassTag[V], + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { withScope { assertNotStopped() val kc = clean(kcf)() @@ -1168,11 +1286,39 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions) + vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions, conf) writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } } } + /** + * Version of sequenceFile() for types implicitly convertible to Writables through a + * WritableConverter. For example, to access a SequenceFile where the keys are Text and the + * values are IntWritable, you could simply write + * {{{ + * sparkContext.sequenceFile[String, Int](path, ...) + * }}} + * + * WritableConverters are provided in a somewhat strange way (by an implicit function) to support + * both subclasses of Writable and types for which we define a converter (e.g. Int to + * IntWritable). The most natural thing would've been to have implicit objects for the + * converters, but then we couldn't have an object for every subclass of Writable (you can't + * have a parameterized singleton object). We use functions instead to create a new converter + * for the appropriate type. In addition, we pass the converter a ClassTag of its type to + * allow it to figure out the Writable class to use in the subclass case. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ + def sequenceFile[K, V]( + path: String, + minPartitions: Int = defaultMinPartitions)(implicit km: ClassTag[K], vm: ClassTag[V], + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = + sequenceFile(path, minPartitions, hadoopConfiguration)(km, vm, kcf, vcf) + /** * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental @@ -1183,12 +1329,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def objectFile[T: ClassTag]( path: String, - minPartitions: Int = defaultMinPartitions): RDD[T] = withScope { + minPartitions: Int, + conf: Configuration): RDD[T] = withScope { assertNotStopped() - sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) + sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions, conf) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader)) } + /** + * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and + * BytesWritable values that contain a serialized partition. This is still an experimental + * storage format and may not be supported exactly as is in future Spark releases. It will also + * be pretty slow if you use the default serializer (Java serialization), + * though the nice thing about it is that there's very little effort required to save arbitrary + * objects. + */ + def objectFile[T: ClassTag]( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[T] = + objectFile(path, minPartitions, hadoopConfiguration) + protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope { new ReliableCheckpointRDD[T](this, path) } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 6f3b8faf03b04..33e444844644f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.io.compress.CompressionCodec +import org.apache.hadoop.mapred.JobConf import org.apache.spark._ import org.apache.spark.annotation.Since @@ -492,6 +493,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def isEmpty(): Boolean = rdd.isEmpty() + /** + * Save this RDD as a text file, using string representations of elements. + */ + def saveAsTextFile(path: String, conf: JobConf): Unit = { + rdd.saveAsTextFile(path, conf) + } + /** * Save this RDD as a text file, using string representations of elements. */ @@ -499,7 +507,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.saveAsTextFile(path) } - /** * Save this RDD as a compressed text file, using string representations of elements. */ @@ -507,6 +514,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.saveAsTextFile(path, codec) } + /** + * Save this RDD as a SequenceFile of serialized objects. + */ + def saveAsObjectFile(path: String, conf: JobConf): Unit = { + rdd.saveAsObjectFile(path, conf) + } + /** * Save this RDD as a SequenceFile of serialized objects. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index dfd91ae338e89..09b85196e7f47 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -178,8 +178,51 @@ class JavaSparkContext(val sc: SparkContext) def textFile(path: String, minPartitions: Int): JavaRDD[String] = sc.textFile(path, minPartitions) + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ + def textFile(path: String, minPartitions: Int, conf: Configuration): JavaRDD[String] = + sc.textFile(path, minPartitions, conf) + + /** + * Read a directory of text files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + *

For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do + * {{{ + * JavaPairRDD rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path") + * }}} + * + *

then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def wholeTextFiles( + path: String, + minPartitions: Int, + conf: Configuration): JavaPairRDD[String, String] = + new JavaPairRDD(sc.wholeTextFiles(path, minPartitions, conf)) + /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a @@ -223,6 +266,41 @@ class JavaSparkContext(val sc: SparkContext) def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), + * or any Hadoop-supported file system URI as a byte array. Each file is read as a single + * record and returned in a key-value pair, where the key is the path of each file, + * the value is the content of each file. + * + * For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do + * `JavaPairRDD rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, + * + * then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred; very large files but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles( + path: String, + minPartitions: Int, + conf: Configuration): JavaPairRDD[String, PortableDataStream] = + new JavaPairRDD(sc.binaryFiles(path, minPartitions, conf)) + /** * Read a directory of binary files from HDFS, a local file system (available on all nodes), * or any Hadoop-supported file system URI as a byte array. Each file is read as a single @@ -291,10 +369,41 @@ class JavaSparkContext(val sc: SparkContext) * @param path Directory to the input data files * @return An RDD of data with values, represented as byte arrays */ + def binaryRecords(path: String, recordLength: Int, conf: Configuration): JavaRDD[Array[Byte]] = { + new JavaRDD(sc.binaryRecords(path, recordLength, conf)) + } + + /** + * :: Experimental :: + * + * Load data from a flat binary file, assuming the length of each record is constant. + * + * @param path Directory to the input data files + * @return An RDD of data with values, represented as byte arrays + */ def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = { new JavaRDD(sc.binaryRecords(path, recordLength)) } + /** + * Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ + def sequenceFile[K, V](path: String, + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int, + conf: Configuration + ): JavaPairRDD[K, V] = { + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions, conf)) + } + /** * Get an RDD for a Hadoop SequenceFile with given key and value types. * @@ -328,6 +437,18 @@ class JavaSparkContext(val sc: SparkContext) new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass)) } + /** + * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and + * BytesWritable values that contain a serialized partition. This is still an experimental storage + * format and may not be supported exactly as is in future Spark releases. It will also be pretty + * slow if you use the default serializer (Java serialization), though the nice thing about it is + * that there's very little effort required to save arbitrary objects. + */ + def objectFile[T](path: String, minPartitions: Int, conf: Configuration): JavaRDD[T] = { + implicit val ctag: ClassTag[T] = fakeClassTag + sc.objectFile(path, minPartitions, conf)(ctag) + } + /** * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental storage @@ -413,6 +534,28 @@ class JavaSparkContext(val sc: SparkContext) new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]]) } + /** + * Get an RDD for a Hadoop file with an arbitrary InputFormat. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ + def hadoopFile[K, V, F <: InputFormat[K, V]]( + path: String, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int, + conf: Configuration + ): JavaPairRDD[K, V] = { + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) + val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, conf) + new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]]) + } + /** * Get an RDD for a Hadoop file with an arbitrary InputFormat. * diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 499a8b9aa1a89..db860dfc5376d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -27,7 +27,7 @@ import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.TextOutputFormat +import org.apache.hadoop.mapred.{JobConf, TextOutputFormat} import org.apache.spark._ import org.apache.spark.Partitioner._ @@ -1387,7 +1387,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a text file, using string representations of elements. */ - def saveAsTextFile(path: String): Unit = withScope { + def saveAsTextFile(path: String, conf: JobConf): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit @@ -1408,9 +1408,17 @@ abstract class RDD[T: ClassTag]( } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) - .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) + .saveAsHadoopFile( + path, classOf[NullWritable], classOf[Text], + classOf[TextOutputFormat[NullWritable, Text]], conf) } + /** + * Save this RDD as a text file, using string representations of elements. + */ + def saveAsTextFile(path: String): Unit = + saveAsTextFile(path, new JobConf(sc.hadoopConfiguration)) + /** * Save this RDD as a compressed text file, using string representations of elements. */ @@ -1432,12 +1440,18 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a SequenceFile of serialized objects. */ - def saveAsObjectFile(path: String): Unit = withScope { + def saveAsObjectFile(path: String, conf: JobConf): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) - .saveAsSequenceFile(path) + .saveAsSequenceFile(path, None, conf) } + /** + * Save this RDD as a SequenceFile of serialized objects. + */ + def saveAsObjectFile(path: String): Unit = + saveAsObjectFile(path, new JobConf(sc.hadoopConfiguration)) + /** * Creates tuples of the elements in this RDD by applying `f`. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 1311b481c7c71..55b94e05a4316 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -82,7 +82,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag */ def saveAsSequenceFile( path: String, - codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { + codec: Option[Class[_ <: CompressionCodec]], + conf: JobConf): Unit = self.withScope { def anyToWritable[U <% Writable](u: U): Writable = u // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and @@ -95,18 +96,29 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," + valueWritableClass.getSimpleName + ")" ) val format = classOf[SequenceFileOutputFormat[Writable, Writable]] - val jobConf = new JobConf(self.context.hadoopConfiguration) if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec) + self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, conf, codec) } else if (!convertKey && convertValue) { self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile( - path, keyWritableClass, valueWritableClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, conf, codec) } else if (convertKey && !convertValue) { self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile( - path, keyWritableClass, valueWritableClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, conf, codec) } else if (convertKey && convertValue) { self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile( - path, keyWritableClass, valueWritableClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, conf, codec) } } + + /** + * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key + * and value types. If the key or value are Writable, then we use their classes directly; + * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc, + * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported + * file system. + */ + def saveAsSequenceFile( + path: String, + codec: Option[Class[_ <: CompressionCodec]] = None): Unit = + saveAsSequenceFile(path, codec, new JobConf(self.context.hadoopConfiguration)) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 841fd02ae8bb6..8c47b23026b26 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -25,11 +25,13 @@ import scala.concurrent.Await import scala.concurrent.duration.Duration import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} import org.scalatest.Matchers._ +import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD} import org.apache.spark.util.Utils class SparkContextSuite extends SparkFunSuite with LocalSparkContext { @@ -275,6 +277,31 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } } + test("Passing configuration into methods that create (New)HadoopRDD (SPARK-8398)") { + try { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + val conf = new Configuration(sc.hadoopConfiguration) + val k = "test" + val v = "dummyForTest" + conf.set(k, v) + def sourceRDD(rdd: RDD[_]): RDD[_] = + if (!rdd.dependencies.isEmpty) rdd.dependencies.head.rdd else rdd + + assert(sourceRDD(sc.textFile("nonexistent", 1, conf)) + .asInstanceOf[HadoopRDD[_, _]].getConf.get(k) == v) + assert(sourceRDD(sc.wholeTextFiles("nonexistent", 1, conf)) + .asInstanceOf[NewHadoopRDD[_, _]].getConf.get(k) == v) + assert(sourceRDD(sc.binaryFiles("nonexistent", 1, conf)) + .asInstanceOf[NewHadoopRDD[_, _]].getConf.get(k) == v) + assert(sourceRDD(sc.sequenceFile[Int, Int]("nonexistent", 1, conf)) + .asInstanceOf[HadoopRDD[_, _]].getConf.get(k) == v) + assert(sourceRDD(sc.objectFile[Int]("nonexistent", 1, conf)) + .asInstanceOf[HadoopRDD[_, _]].getConf.get(k) == v) + } finally { + sc.stop() + } + } + test("Default path for file based RDDs is properly set (SPARK-12517)") { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9b2a966aaf4db..31caa6dfcab1b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -652,6 +652,12 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.shuffleWriteMetrics"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.shuffleReadMetrics"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.this") + ) ++ Seq( + // New methods introduced in SPARK-8398 that expose hadoop Configuration + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.saveAsTextFile"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.saveAsObjectFile") ) ++ Seq( // SPARK-13643: Move functionality from SQLContext to SparkSession ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.getSchema") @@ -718,7 +724,8 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.ml.regression.LeastSquaresAggregator.add"), ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.ml.regression.LeastSquaresCostFun.this"), + "org.apache.spark.ml.regression.LeastSquaresCostFun.this") + ) ++ Seq( ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.sql.SQLContext.clearLastInstantiatedContext"), ProblemFilters.exclude[MissingMethodProblem](