Skip to content

Commit 818a1e6

Browse files
committed
Add seqencefile and Hadoop InputFormat support to PythonRDD
1 parent 4e7c9e3 commit 818a1e6

1 file changed

Lines changed: 34 additions & 35 deletions

File tree

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,21 @@ package org.apache.spark.api.python
1919

2020
import java.io._
2121
import java.net._
22-
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
22+
import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap}
2323

2424
import scala.collection.JavaConversions._
2525
import scala.reflect.ClassTag
2626

27-
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
28-
import org.apache.spark.broadcast.Broadcast
27+
import org.apache.hadoop.conf.Configuration
28+
import org.apache.hadoop.mapred.{InputFormat, JobConf}
29+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
2930
import org.apache.spark._
31+
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
32+
import org.apache.spark.broadcast.Broadcast
3033
import org.apache.spark.rdd.RDD
3134
import org.apache.spark.util.Utils
32-
import org.apache.hadoop.io._
33-
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat
34-
import org.apache.spark.api.java.function.PairFunction
35-
import scala.util.{Success, Failure, Try}
36-
import org.msgpack
37-
import org.msgpack.ScalaMessagePack
38-
import org.apache.hadoop.mapred.InputFormat
3935

4036
private[spark] class PythonRDD[T: ClassTag](
41-
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
42-
import org.apache.hadoop.mapred.{JobConf, SequenceFileOutputFormat}
43-
import org.apache.hadoop.conf.Configuration
44-
import java.util
45-
46-
private[spark] class PythonRDD[T: ClassManifest](
4737
parent: RDD[T],
4838
command: Array[Byte],
4939
envVars: JMap[String, String],
@@ -218,21 +208,20 @@ private[spark] object PythonRDD extends Logging {
218208
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
219209
}
220210

221-
// PySpark / Hadoop InputFormat stuff
211+
// PySpark / Hadoop InputFormat//
222212

223213
/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
224-
def sequenceFile[K ,V](sc: JavaSparkContext,
214+
def sequenceFile[K, V](sc: JavaSparkContext,
225215
path: String,
226216
keyClass: String,
227217
valueClass: String,
228218
keyWrapper: String,
229219
valueWrapper: String,
230220
minSplits: Int) = {
231-
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClass)).asInstanceOf[ClassManifest[K]]
232-
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClass)).asInstanceOf[ClassManifest[V]]
233-
val kc = kcm.erasure.asInstanceOf[Class[K]]
234-
val vc = vcm.erasure.asInstanceOf[Class[V]]
235-
221+
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
222+
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
223+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
224+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
236225
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
237226
val converted = SerDeUtil.convertRDD[K, V](rdd)
238227
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
@@ -285,12 +274,12 @@ private[spark] object PythonRDD extends Logging {
285274
keyClazz: String,
286275
valueClazz: String,
287276
conf: Configuration) = {
288-
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
289-
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
290-
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
291-
val kc = kcm.erasure.asInstanceOf[Class[K]]
292-
val vc = vcm.erasure.asInstanceOf[Class[V]]
293-
val fc = fcm.erasure.asInstanceOf[Class[F]]
277+
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
278+
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
279+
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
280+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
281+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
282+
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
294283
val rdd = if (path.isDefined) {
295284
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
296285
} else {
@@ -299,6 +288,10 @@ private[spark] object PythonRDD extends Logging {
299288
rdd
300289
}
301290

291+
/**
292+
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
293+
* key and value class
294+
*/
302295
def hadoopFile[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
303296
path: String,
304297
inputFormatClazz: String,
@@ -317,6 +310,10 @@ private[spark] object PythonRDD extends Logging {
317310
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
318311
}
319312

313+
/**
314+
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
315+
* using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], key and value class
316+
*/
320317
def hadoopRDD[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
321318
inputFormatClazz: String,
322319
keyClazz: String,
@@ -338,12 +335,12 @@ private[spark] object PythonRDD extends Logging {
338335
keyClazz: String,
339336
valueClazz: String,
340337
conf: Configuration) = {
341-
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
342-
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
343-
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
344-
val kc = kcm.erasure.asInstanceOf[Class[K]]
345-
val vc = vcm.erasure.asInstanceOf[Class[V]]
346-
val fc = fcm.erasure.asInstanceOf[Class[F]]
338+
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
339+
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
340+
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
341+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
342+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
343+
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
347344
val rdd = if (path.isDefined) {
348345
sc.sc.hadoopFile(path.get, fc, kc, vc)
349346
} else {
@@ -352,6 +349,8 @@ private[spark] object PythonRDD extends Logging {
352349
rdd
353350
}
354351

352+
// **** //
353+
355354
def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
356355
elem match {
357356
case bytes: Array[Byte] =>

0 commit comments

Comments
 (0)