1717
1818package org .apache .spark .api .python
1919
20+ import org .apache .spark .broadcast .Broadcast
2021import org .apache .spark .rdd .RDD
21- import org .apache .spark .Logging
22+ import org .apache .spark .{ Logging , SerializableWritable , SparkException }
2223import org .apache .hadoop .conf .Configuration
2324import org .apache .hadoop .io ._
2425import scala .util .{Failure , Success , Try }
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
3132 * transformation code by overriding the convert method.
3233 */
3334@ Experimental
34- trait Converter [T , U ] extends Serializable {
35+ trait Converter [T , + U ] extends Serializable {
3536 def convert (obj : T ): U
3637}
3738
3839private [python] object Converter extends Logging {
3940
40- def getInstance (converterClass : Option [String ]): Converter [Any , Any ] = {
41+ def getInstance (converterClass : Option [String ],
42+ defaultConverter : Converter [Any , Any ]): Converter [Any , Any ] = {
4143 converterClass.map { cc =>
4244 Try {
4345 val c = Class .forName(cc).newInstance().asInstanceOf [Converter [Any , Any ]]
@@ -49,15 +51,17 @@ private[python] object Converter extends Logging {
4951 logError(s " Failed to load converter: $cc" )
5052 throw err
5153 }
52- }.getOrElse { new DefaultConverter }
54+ }.getOrElse { defaultConverter }
5355 }
5456}
5557
5658/**
5759 * A converter that handles conversion of common [[org.apache.hadoop.io.Writable ]] objects.
5860 * Other objects are passed through without conversion.
5961 */
60- private [python] class DefaultConverter extends Converter [Any , Any ] {
62+ private [python] class WritableToJavaConverter (
63+ conf : Broadcast [SerializableWritable [Configuration ]],
64+ batchSize : Int ) extends Converter [Any , Any ] {
6165
6266 /**
6367 * Converts a [[org.apache.hadoop.io.Writable ]] to the underlying primitive, String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
7276 case fw : FloatWritable => fw.get()
7377 case t : Text => t.toString
7478 case bw : BooleanWritable => bw.get()
75- case byw : BytesWritable => byw.getBytes
79+ case byw : BytesWritable =>
80+ val bytes = new Array [Byte ](byw.getLength)
81+ System .arraycopy(byw.getBytes(), 0 , bytes, 0 , byw.getLength)
82+ bytes
7683 case n : NullWritable => null
77- case aw : ArrayWritable => aw.get().map(convertWritable(_))
78- case mw : MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
79- (convertWritable(k), convertWritable(v))
80- }.toMap)
84+ case aw : ArrayWritable =>
85+ // Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
86+ // Since we can't determine element types for empty arrays, we will not attempt to
87+ // convert to primitive arrays (which get pickled to Python arrays). Users may want
88+ // write custom converters for arrays if they know the element types a priori.
89+ aw.get().map(convertWritable(_))
90+ case mw : MapWritable =>
91+ val map = new java.util.HashMap [Any , Any ]()
92+ mw.foreach { case (k, v) =>
93+ map.put(convertWritable(k), convertWritable(v))
94+ }
95+ map
96+ case w : Writable =>
97+ if (batchSize > 1 ) WritableUtils .clone(w, conf.value.value) else w
8198 case other => other
8299 }
83100 }
84101
85- def convert (obj : Any ): Any = {
102+ override def convert (obj : Any ): Any = {
86103 obj match {
87104 case writable : Writable =>
88105 convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
92109 }
93110}
94111
112+ /**
113+ * A converter that converts common types to [[org.apache.hadoop.io.Writable ]]. Note that array
114+ * types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable ]]
115+ * to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable ]] and
116+ * [[org.apache.spark.api.python.DoubleArrayToWritableConverter ]] for an example. They are used in
117+ * PySpark RDD `saveAsNewAPIHadoopFile` doctest.
118+ */
119+ private [python] class JavaToWritableConverter extends Converter [Any , Writable ] {
120+
121+ /**
122+ * Converts common data types to [[org.apache.hadoop.io.Writable ]]. Note that array types are not
123+ * supported out-of-the-box.
124+ */
125+ private def convertToWritable (obj : Any ): Writable = {
126+ import collection .JavaConversions ._
127+ obj match {
128+ case i : java.lang.Integer => new IntWritable (i)
129+ case d : java.lang.Double => new DoubleWritable (d)
130+ case l : java.lang.Long => new LongWritable (l)
131+ case f : java.lang.Float => new FloatWritable (f)
132+ case s : java.lang.String => new Text (s)
133+ case b : java.lang.Boolean => new BooleanWritable (b)
134+ case aob : Array [Byte ] => new BytesWritable (aob)
135+ case null => NullWritable .get()
136+ case map : java.util.Map [_, _] =>
137+ val mapWritable = new MapWritable ()
138+ map.foreach { case (k, v) =>
139+ mapWritable.put(convertToWritable(k), convertToWritable(v))
140+ }
141+ mapWritable
142+ case other => throw new SparkException (
143+ s " Data of type ${other.getClass.getName} cannot be used " )
144+ }
145+ }
146+
147+ override def convert (obj : Any ): Writable = obj match {
148+ case writable : Writable => writable
149+ case other => convertToWritable(other)
150+ }
151+ }
152+
95153/** Utilities for working with Python objects <-> Hadoop-related objects */
96154private [python] object PythonHadoopUtil {
97155
@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
118176
119177 /**
120178 * Converts an RDD of key-value pairs, where key and/or value could be instances of
121- * [[org.apache.hadoop.io.Writable ]], into an RDD[(K, V)]
179+ * [[org.apache.hadoop.io.Writable ]], into an RDD of base types, or vice versa.
122180 */
123181 def convertRDD [K , V ](rdd : RDD [(K , V )],
124182 keyConverter : Converter [Any , Any ],
0 commit comments