From ab6025d5773a1570f24031b37136219fe361ec2f Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 15:00:47 -0700 Subject: [PATCH 01/49] compiling --- .../apache/spark/api/python/PythonRDD.scala | 10 +++++++ project/SparkBuild.scala | 3 ++- .../spark/sql/api/java/JavaSQLContext.scala | 27 +++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 32f1100406d7..934bfae31432 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -25,6 +25,8 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import net.razorvine.pickle.Unpickler + import org.apache.spark._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast @@ -284,6 +286,14 @@ private[spark] object PythonRDD { file.close() } + def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = { + pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + iter.map { row => + unpickle.loads(row) + } + } + } } private diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a6058bba3d21..b37b31bdac81 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -345,7 +345,8 @@ object SparkBuild extends Build { "com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm), "com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm), "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), - "com.clearspring.analytics" % "stream" % "2.5.1" + "com.clearspring.analytics" % "stream" % "2.5.1", + "net.razorvine" % "pyrolite_2.10" % "1.1" ), libraryDependencies ++= maybeAvro ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 573345e42c43..81816ef98e3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -82,6 +82,33 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) } + /** + * Applies a schema to an RDD of Array[Any] + */ + def applySchema(rdd: JavaRDD[Array[Any]]): JavaSchemaRDD = { + val fields = rdd.first.map(_.getClass) + val schema = fields.zipWithIndex.map { case (klass, index) => + val dataType = klass match { + case c: Class[_] if c == classOf[java.lang.String] => StringType + case c: Class[_] if c == java.lang.Short.TYPE => ShortType + case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType + case c: Class[_] if c == java.lang.Long.TYPE => LongType + case c: Class[_] if c == java.lang.Double.TYPE => DoubleType + case c: Class[_] if c == java.lang.Byte.TYPE => ByteType + case c: Class[_] if c == java.lang.Float.TYPE => FloatType + case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType + } + + AttributeReference(index.toString, dataType, true)() + } + + val rowRdd = rdd.rdd.mapPartitions { iter => + iter.map { row => + new GenericRow(row): ScalaRow + } + } + new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + } /** * Loads a parquet file, returning the result as a [[JavaSchemaRDD]]. From bcc0f23deb3fd772b180c850511d5ffaf344b7aa Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 15:03:59 -0700 Subject: [PATCH 02/49] Java to python --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 934bfae31432..032240176549 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -25,7 +25,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import net.razorvine.pickle.Unpickler +import net.razorvine.pickle.{Pickler, Unpickler} import org.apache.spark._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} @@ -294,6 +294,15 @@ private[spark] object PythonRDD { } } } + + def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { + jRDD.rdd.mapPartitions { iter => + val unpickle = new Pickler + iter.map { row => + unpickle.dumps(row) + } + } + } } private From 67ba875f262c054765dfd856da27db4852d707ee Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 16:07:10 -0700 Subject: [PATCH 03/49] java to python, and python to java --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 8 ++++++-- python/pyspark/context.py | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 032240176549..31f7cd319a0c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -289,8 +289,12 @@ private[spark] object PythonRDD { def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = { pyRDD.rdd.mapPartitions { iter => val unpickle = new Unpickler - iter.map { row => - unpickle.loads(row) + iter.flatMap { row => + unpickle.loads(row) match { + case objs: java.util.ArrayList[Any] => objs + // Incase the partition doesn't have a collection + case obj => Seq(obj) + } } } } diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d8667e84fedf..7a5f9349649f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -174,6 +174,8 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile + SparkContext._pythonToJava = SparkContext._jvm.PythonRDD.pythonToJava + SparkContext._javaToPython = SparkContext._jvm.PythonRDD.javaToPython if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: From b8b904b9cb775aca6dbf5a8c173ed4fe7a05250b Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 21:41:09 -0700 Subject: [PATCH 04/49] Added schema rdd class --- python/pyspark/java_gateway.py | 1 + python/pyspark/rdd.py | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 6a16756e0576..8b079f7215b4 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -64,5 +64,6 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") + java_import(gateway.jvm, "org.apache.spark.sql.api.java.JavaSQLContext") java_import(gateway.jvm, "scala.Tuple2") return gateway diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 91fc7e637e2c..76ea4db846e8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1387,6 +1387,14 @@ def _jrdd(self): def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) +class SchemaRDD: + + def __init__(self, pyRDD): + self._pyRDD = pyRDD + self.ctx = pyRDD.ctx + self.sql_ctx = self.ctx._jvm.JavaSQLContext(self.ctx._jsc) + self._jrdd = self.ctx._pythonToJava(pyRDD._jrdd) + def _test(): import doctest From 5496f9f62f8eb36204f2f2777f6ca78e813d001e Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 15:45:55 -0700 Subject: [PATCH 05/49] doesn't crash --- .../apache/spark/api/python/PythonRDD.scala | 3 +- .../spark/sql/api/java/JavaSQLContext.scala | 28 +++++++++++-------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 31f7cd319a0c..9eb16e1cec05 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -208,7 +208,7 @@ private object SpecialLengths { val TIMING_DATA = -3 } -private[spark] object PythonRDD { +object PythonRDD { val UTF8 = Charset.forName("UTF-8") def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): @@ -289,6 +289,7 @@ private[spark] object PythonRDD { def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = { pyRDD.rdd.mapPartitions { iter => val unpickle = new Unpickler + // TODO: Figure out why flatMap is necessay for pyspark iter.flatMap { row => unpickle.loads(row) match { case objs: java.util.ArrayList[Any] => objs diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 81816ef98e3a..92bf5bf20c17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -85,26 +85,32 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { /** * Applies a schema to an RDD of Array[Any] */ - def applySchema(rdd: JavaRDD[Array[Any]]): JavaSchemaRDD = { - val fields = rdd.first.map(_.getClass) + def applySchema(rdd: JavaRDD[_]): JavaSchemaRDD = { + val fields = rdd.first match { + case row: java.util.ArrayList[_] => row.toArray.map(_.getClass) + case row => throw new Exception(s"Rows must be Lists 1 ${row.getClass}") + } + val schema = fields.zipWithIndex.map { case (klass, index) => val dataType = klass match { case c: Class[_] if c == classOf[java.lang.String] => StringType - case c: Class[_] if c == java.lang.Short.TYPE => ShortType - case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType - case c: Class[_] if c == java.lang.Long.TYPE => LongType - case c: Class[_] if c == java.lang.Double.TYPE => DoubleType - case c: Class[_] if c == java.lang.Byte.TYPE => ByteType - case c: Class[_] if c == java.lang.Float.TYPE => FloatType - case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType + case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType + // case c: Class[_] if c == java.lang.Short.TYPE => ShortType + // case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType + // case c: Class[_] if c == java.lang.Long.TYPE => LongType + // case c: Class[_] if c == java.lang.Double.TYPE => DoubleType + // case c: Class[_] if c == java.lang.Byte.TYPE => ByteType + // case c: Class[_] if c == java.lang.Float.TYPE => FloatType + // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType } AttributeReference(index.toString, dataType, true)() } val rowRdd = rdd.rdd.mapPartitions { iter => - iter.map { row => - new GenericRow(row): ScalaRow + iter.map { + case row: java.util.ArrayList[_] => new GenericRow(row.toArray.asInstanceOf[Array[Any]]): ScalaRow + case row => throw new Exception(s"Rows must be Lists 2 ${row.getClass}") } } new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) From 043ca85adcf4ca496abba0a3a214d5cfec8f46b7 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 16:09:22 -0700 Subject: [PATCH 06/49] working --- python/pyspark/rdd.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 76ea4db846e8..93787e309c92 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1394,6 +1394,7 @@ def __init__(self, pyRDD): self.ctx = pyRDD.ctx self.sql_ctx = self.ctx._jvm.JavaSQLContext(self.ctx._jsc) self._jrdd = self.ctx._pythonToJava(pyRDD._jrdd) + self._srdd = self.sql_ctx.applySchema(self._jrdd) def _test(): From c0fb1c652fb26a4721c8ae182615d88fc8dd836a Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 16:47:36 -0700 Subject: [PATCH 07/49] more working --- python/pyspark/context.py | 19 ++++++++++++++++++- python/pyspark/rdd.py | 16 ++++++++++------ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 7a5f9349649f..751ed7d67472 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -32,7 +32,7 @@ PairDeserializer from pyspark.storagelevel import StorageLevel from pyspark import rdd -from pyspark.rdd import RDD +from pyspark.rdd import RDD, SchemaRDD from py4j.java_collections import ListConverter @@ -462,6 +462,23 @@ def sparkUser(self): """ return self._jsc.sc().sparkUser() +class SQLContext: + + def __init__(self, sparkContext): + self._sc = sparkContext + self._jsc = self._sc._jsc + self._jvm = self._sc._jvm + self._jsql_ctx = self._jvm.JavaSQLContext(self._jsc) + + def sql(self, sqlQuery): + return SchemaRDD(self._jsql_ctx.sql(sqlQuery), self) + + def applySchema(self, rdd): + jrdd = self._sc._pythonToJava(rdd._jrdd) + srdd = self._jsql_ctx.applySchema(jrdd) + return SchemaRDD(srdd, self) + + def _test(): import atexit import doctest diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 93787e309c92..269864d358a6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1389,13 +1389,17 @@ def _is_pipelinable(self): class SchemaRDD: - def __init__(self, pyRDD): - self._pyRDD = pyRDD - self.ctx = pyRDD.ctx - self.sql_ctx = self.ctx._jvm.JavaSQLContext(self.ctx._jsc) - self._jrdd = self.ctx._pythonToJava(pyRDD._jrdd) - self._srdd = self.sql_ctx.applySchema(self._jrdd) + def __init__(self, jschema_rdd, sql_ctx): + self.sql_ctx = sql_ctx + self._sc = sql_ctx._sc + self._jschema_rdd = jschema_rdd + def registerAsTable(self, name): + self._jschema_rdd.registerAsTable(name) + + def toPython(self): + jrdd = self._sc._javaToPython(self._jschema_rdd) + return RDD(jrdd, self._sc, self._sc.serializer) def _test(): import doctest From 4886052b822b6835d901b86812f1ea87acd70646 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 17:16:57 -0700 Subject: [PATCH 08/49] even better --- python/pyspark/rdd.py | 3 ++- .../apache/spark/sql/api/java/JavaSchemaRDD.scala | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 269864d358a6..d5d5f570694c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1398,7 +1398,8 @@ def registerAsTable(self, name): self._jschema_rdd.registerAsTable(name) def toPython(self): - jrdd = self._sc._javaToPython(self._jschema_rdd) + jrdd = self._jschema_rdd.javaToPython() + #jrdd = self._sc._javaToPython(self._jschema_rdd) return RDD(jrdd, self._sc, self._sc.serializer) def _test(): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala index d43d672938f5..f068519cc0e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.api.java +import net.razorvine.pickle.{Pickler, Unpickler} + import org.apache.spark.api.java.{JavaRDDLike, JavaRDD} import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -45,4 +47,14 @@ class JavaSchemaRDD( override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd) val rdd = baseSchemaRDD.map(new Row(_)) + + def javaToPython: JavaRDD[Array[Byte]] = { + this.rdd.mapPartitions { iter => + val unpickle = new Pickler + iter.map { row => + val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row.get(i)).toArray + unpickle.dumps(fields) + } + } + } } From e948bd9b3af1fedff1603df236fed9a723e5d324 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 18:26:48 -0700 Subject: [PATCH 09/49] yippie --- project/SparkBuild.scala | 1 + python/pyspark/context.py | 5 +++-- .../org/apache/spark/sql/api/java/JavaSQLContext.scala | 8 +++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b37b31bdac81..0685ff76abcc 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -522,6 +522,7 @@ object SparkBuild extends Build { def extraAssemblySettings() = Seq( test in assembly := {}, + assemblyOption in assembly ~= { _.copy(cacheOutput = false) }, mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 751ed7d67472..22a98a7ec955 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -473,9 +473,10 @@ def __init__(self, sparkContext): def sql(self, sqlQuery): return SchemaRDD(self._jsql_ctx.sql(sqlQuery), self) - def applySchema(self, rdd): + def applySchema(self, rdd, fieldNames): + fieldNames = ListConverter().convert(fieldNames, self._sc._gateway._gateway_client) jrdd = self._sc._pythonToJava(rdd._jrdd) - srdd = self._jsql_ctx.applySchema(jrdd) + srdd = self._jsql_ctx.applySchema(jrdd, fieldNames) return SchemaRDD(srdd, self) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 92bf5bf20c17..bd9fe7fbb009 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -85,13 +85,13 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { /** * Applies a schema to an RDD of Array[Any] */ - def applySchema(rdd: JavaRDD[_]): JavaSchemaRDD = { + def applySchema(rdd: JavaRDD[_], fieldNames: java.util.ArrayList[Any]): JavaSchemaRDD = { val fields = rdd.first match { case row: java.util.ArrayList[_] => row.toArray.map(_.getClass) case row => throw new Exception(s"Rows must be Lists 1 ${row.getClass}") } - val schema = fields.zipWithIndex.map { case (klass, index) => + val schema = fields.zip(fieldNames.toArray).map { case (klass, fieldName) => val dataType = klass match { case c: Class[_] if c == classOf[java.lang.String] => StringType case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType @@ -104,7 +104,9 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType } - AttributeReference(index.toString, dataType, true)() + println(fieldName.toString) + // TODO: No bueno, fieldName.toString used because I can't figure out the casting + AttributeReference(fieldName.toString, dataType, true)() } val rowRdd = rdd.rdd.mapPartitions { iter => From cd5f79fa67903080441b9c2cb91e3a171be827ca Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 21:25:14 -0700 Subject: [PATCH 10/49] Switched to using Scala SQLContext --- .../apache/spark/api/python/PythonRDD.scala | 14 +++++++ python/pyspark/context.py | 16 +++++--- python/pyspark/java_gateway.py | 2 +- .../org/apache/spark/sql/SQLContext.scala | 27 ++++++++++++++ .../org/apache/spark/sql/SchemaRDD.scala | 13 +++++++ .../spark/sql/api/java/JavaSQLContext.scala | 37 +------------------ .../spark/sql/api/java/JavaSchemaRDD.scala | 12 ------ 7 files changed, 66 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 9eb16e1cec05..11ab81f1498b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -300,6 +300,20 @@ object PythonRDD { } } + def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = { + pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + // TODO: Figure out why flatMap is necessay for pyspark + iter.flatMap { row => + unpickle.loads(row) match { + case objs: java.util.ArrayList[JMap[String, _]] => objs.map(_.toMap) + // Incase the partition doesn't have a collection + case obj: JMap[String, _] => Seq(obj.toMap) + } + } + } + } + def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { jRDD.rdd.mapPartitions { iter => val unpickle = new Pickler diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 22a98a7ec955..b8ac6db97457 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -175,6 +175,7 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile SparkContext._pythonToJava = SparkContext._jvm.PythonRDD.pythonToJava + SparkContext._pythonToJavaMap = SparkContext._jvm.PythonRDD.pythonToJavaMap SparkContext._javaToPython = SparkContext._jvm.PythonRDD.javaToPython if instance: @@ -468,15 +469,18 @@ def __init__(self, sparkContext): self._sc = sparkContext self._jsc = self._sc._jsc self._jvm = self._sc._jvm - self._jsql_ctx = self._jvm.JavaSQLContext(self._jsc) + self._ssql_ctx = self._jvm.SQLContext(self._jsc.sc()) def sql(self, sqlQuery): - return SchemaRDD(self._jsql_ctx.sql(sqlQuery), self) + return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) - def applySchema(self, rdd, fieldNames): - fieldNames = ListConverter().convert(fieldNames, self._sc._gateway._gateway_client) - jrdd = self._sc._pythonToJava(rdd._jrdd) - srdd = self._jsql_ctx.applySchema(jrdd, fieldNames) + def applySchema(self, rdd): + first = rdd.first() + if (rdd.__class__ is SchemaRDD): + raise Exception("Cannot apply schema to %s" % SchemaRDD.__name__) + + jrdd = self._sc._pythonToJavaMap(rdd._jrdd) + srdd = self._ssql_ctx.applySchema(jrdd.rdd()) return SchemaRDD(srdd, self) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 8b079f7215b4..d8dd2a65225e 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -64,6 +64,6 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") - java_import(gateway.jvm, "org.apache.spark.sql.api.java.JavaSQLContext") + java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "scala.Tuple2") return gateway diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d3d4c56bafe4..fdf28822c59d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -26,11 +26,13 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.columnar.InMemoryColumnarTableScan import org.apache.spark.sql.execution._ +import org.apache.spark.api.java.JavaRDD /** * :: AlphaComponent :: @@ -241,4 +243,29 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def debugExec() = DebugQuery(executedPlan).execute().collect() } + + def applySchema(rdd: RDD[Map[String, _]]): SchemaRDD = { + val schema = rdd.first.map { case (fieldName, obj) => + val dataType = obj.getClass match { + case c: Class[_] if c == classOf[java.lang.String] => StringType + case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType + // case c: Class[_] if c == java.lang.Short.TYPE => ShortType + // case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType + // case c: Class[_] if c == java.lang.Long.TYPE => LongType + // case c: Class[_] if c == java.lang.Double.TYPE => DoubleType + // case c: Class[_] if c == java.lang.Byte.TYPE => ByteType + // case c: Class[_] if c == java.lang.Float.TYPE => FloatType + // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType + } + AttributeReference(fieldName, dataType, true)() + }.toSeq + + val rowRdd = rdd.mapPartitions { iter => + iter.map { map => + new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row + } + } + new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 91500416eefa..f15fb113d375 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import net.razorvine.pickle.{Pickler, Unpickler} + import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} import org.apache.spark.annotation.{AlphaComponent, Experimental} import org.apache.spark.rdd.RDD @@ -25,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType +import org.apache.spark.api.java.JavaRDD /** * :: AlphaComponent :: @@ -308,4 +311,14 @@ class SchemaRDD( /** FOR INTERNAL USE ONLY */ def analyze = sqlContext.analyzer(logicalPlan) + + def javaToPython: JavaRDD[Array[Byte]] = { + this.mapPartitions { iter => + val unpickle = new Pickler + iter.map { row => + val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row(i)).toArray + unpickle.dumps(fields) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index bd9fe7fbb009..4ca4505fbfc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.api.java import java.beans.{Introspector, PropertyDescriptor} +import java.util.{Map => JMap} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql.SQLContext @@ -82,42 +83,6 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) } - /** - * Applies a schema to an RDD of Array[Any] - */ - def applySchema(rdd: JavaRDD[_], fieldNames: java.util.ArrayList[Any]): JavaSchemaRDD = { - val fields = rdd.first match { - case row: java.util.ArrayList[_] => row.toArray.map(_.getClass) - case row => throw new Exception(s"Rows must be Lists 1 ${row.getClass}") - } - - val schema = fields.zip(fieldNames.toArray).map { case (klass, fieldName) => - val dataType = klass match { - case c: Class[_] if c == classOf[java.lang.String] => StringType - case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType - // case c: Class[_] if c == java.lang.Short.TYPE => ShortType - // case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType - // case c: Class[_] if c == java.lang.Long.TYPE => LongType - // case c: Class[_] if c == java.lang.Double.TYPE => DoubleType - // case c: Class[_] if c == java.lang.Byte.TYPE => ByteType - // case c: Class[_] if c == java.lang.Float.TYPE => FloatType - // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType - } - - println(fieldName.toString) - // TODO: No bueno, fieldName.toString used because I can't figure out the casting - AttributeReference(fieldName.toString, dataType, true)() - } - - val rowRdd = rdd.rdd.mapPartitions { iter => - iter.map { - case row: java.util.ArrayList[_] => new GenericRow(row.toArray.asInstanceOf[Array[Any]]): ScalaRow - case row => throw new Exception(s"Rows must be Lists 2 ${row.getClass}") - } - } - new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) - } - /** * Loads a parquet file, returning the result as a [[JavaSchemaRDD]]. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala index f068519cc0e5..d43d672938f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.api.java -import net.razorvine.pickle.{Pickler, Unpickler} - import org.apache.spark.api.java.{JavaRDDLike, JavaRDD} import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -47,14 +45,4 @@ class JavaSchemaRDD( override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd) val rdd = baseSchemaRDD.map(new Row(_)) - - def javaToPython: JavaRDD[Array[Byte]] = { - this.rdd.mapPartitions { iter => - val unpickle = new Pickler - iter.map { row => - val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row.get(i)).toArray - unpickle.dumps(fields) - } - } - } } From be079de9d4175c66ab794856b9a9b0fe1625e47b Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 22:11:23 -0700 Subject: [PATCH 11/49] returning dictionaries works --- .../scala/org/apache/spark/sql/SchemaRDD.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index f15fb113d375..6aa4c7675674 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType import org.apache.spark.api.java.JavaRDD +import java.util.{Map => JMap} /** * :: AlphaComponent :: @@ -313,11 +314,18 @@ class SchemaRDD( def analyze = sqlContext.analyzer(logicalPlan) def javaToPython: JavaRDD[Array[Byte]] = { + //val fieldNames: Seq[String] = logicalPlan.references.map(_.name) this.mapPartitions { iter => - val unpickle = new Pickler + val pickle = new Pickler iter.map { row => - val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row(i)).toArray - unpickle.dumps(fields) + val fieldNames: Seq[String] = (1 to row.length).map(_.toString + "KEY") //TODO: Temporary + val map: JMap[String, Any] = new java.util.HashMap + val arr: java.util.ArrayList[Any] = new java.util.ArrayList + row.zip(fieldNames).foreach { case (obj, name) => + map.put(name, obj) + } + arr.add(map) + pickle.dumps(arr) } } } From 4fe1319db025189e310c23a409434f3bcb14f06c Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 22:17:48 -0700 Subject: [PATCH 12/49] output dictionaries correctly --- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 6aa4c7675674..c7fac0403453 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -314,11 +314,10 @@ class SchemaRDD( def analyze = sqlContext.analyzer(logicalPlan) def javaToPython: JavaRDD[Array[Byte]] = { - //val fieldNames: Seq[String] = logicalPlan.references.map(_.name) + val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) this.mapPartitions { iter => val pickle = new Pickler iter.map { row => - val fieldNames: Seq[String] = (1 to row.length).map(_.toString + "KEY") //TODO: Temporary val map: JMap[String, Any] = new java.util.HashMap val arr: java.util.ArrayList[Any] = new java.util.ArrayList row.zip(fieldNames).foreach { case (obj, name) => From 55d1c764d4bb73838c50425633e13a79531526f7 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 22:25:26 -0700 Subject: [PATCH 13/49] return row objects --- python/pyspark/rdd.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d5d5f570694c..542cc73b57da 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1387,6 +1387,11 @@ def _jrdd(self): def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) +class Row: + + def __init__(self, d): + self.__dict__ = dict(self.__dict__.items() + d.items()) + class SchemaRDD: def __init__(self, jschema_rdd, sql_ctx): @@ -1400,7 +1405,7 @@ def registerAsTable(self, name): def toPython(self): jrdd = self._jschema_rdd.javaToPython() #jrdd = self._sc._javaToPython(self._jschema_rdd) - return RDD(jrdd, self._sc, self._sc.serializer) + return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) def _test(): import doctest From 725c91e757b1bcb84736b4841a6df160663da328 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 22:42:19 -0700 Subject: [PATCH 14/49] awesome row objects --- python/pyspark/rdd.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 542cc73b57da..256f9d0d26d9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1387,10 +1387,12 @@ def _jrdd(self): def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) -class Row: +class Row(dict): def __init__(self, d): - self.__dict__ = dict(self.__dict__.items() + d.items()) + d.update(self.__dict__) + self.__dict__ = d + dict.__init__(self, d) class SchemaRDD: From c608947cafa9a97be3e6e5f97d6309d4d461451d Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 23:13:01 -0700 Subject: [PATCH 15/49] SchemaRDD now has all RDD operations --- python/pyspark/rdd.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 256f9d0d26d9..0076b54fea28 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1401,14 +1401,26 @@ def __init__(self, jschema_rdd, sql_ctx): self._sc = sql_ctx._sc self._jschema_rdd = jschema_rdd + self._jrdd = self.toPython()._jrdd + self.is_cached = False + self.is_checkpointed = False + self.ctx = self.sql_ctx._sc + self._jrdd_deserializer = self.ctx.serializer + # TODO: Figure out how to make this lazy + #self._id = self._jrdd.id() + def registerAsTable(self, name): self._jschema_rdd.registerAsTable(name) def toPython(self): jrdd = self._jschema_rdd.javaToPython() - #jrdd = self._sc._javaToPython(self._jschema_rdd) return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) +customRDDDict = dict(RDD.__dict__) +del customRDDDict["__init__"] + +SchemaRDD.__dict__.update(customRDDDict) + def _test(): import doctest from pyspark.context import SparkContext From 09b9980e9fecd37db5a9cf6ff9cf07bdc43092e9 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 23:19:05 -0700 Subject: [PATCH 16/49] made jrdd explicitly lazy --- python/pyspark/rdd.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0076b54fea28..16c7a3ba4922 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1394,20 +1394,25 @@ def __init__(self, d): self.__dict__ = d dict.__init__(self, d) -class SchemaRDD: +class SchemaRDD(RDD): def __init__(self, jschema_rdd, sql_ctx): self.sql_ctx = sql_ctx self._sc = sql_ctx._sc self._jschema_rdd = jschema_rdd - self._jrdd = self.toPython()._jrdd self.is_cached = False self.is_checkpointed = False self.ctx = self.sql_ctx._sc self._jrdd_deserializer = self.ctx.serializer - # TODO: Figure out how to make this lazy - #self._id = self._jrdd.id() + + @property + def _jrdd(self): + return self.toPython()._jrdd + + @property + def _id(self): + return self._jrdd.id() def registerAsTable(self, name): self._jschema_rdd.registerAsTable(name) @@ -1416,11 +1421,6 @@ def toPython(self): jrdd = self._jschema_rdd.javaToPython() return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) -customRDDDict = dict(RDD.__dict__) -del customRDDDict["__init__"] - -SchemaRDD.__dict__.update(customRDDDict) - def _test(): import doctest from pyspark.context import SparkContext From 251f99d3d0237a29b7eca2a0d9df15ce58289ea6 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 23:33:52 -0700 Subject: [PATCH 17/49] for now only allow dictionaries as input --- python/pyspark/context.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index b8ac6db97457..f30ebb9c8e7d 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -475,9 +475,10 @@ def sql(self, sqlQuery): return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def applySchema(self, rdd): - first = rdd.first() if (rdd.__class__ is SchemaRDD): raise Exception("Cannot apply schema to %s" % SchemaRDD.__name__) + elif type(rdd.first()) is not dict: + raise Exception("Only RDDs with dictionaries can be converted to %s" % SchemaRDD.__name__) jrdd = self._sc._pythonToJavaMap(rdd._jrdd) srdd = self._ssql_ctx.applySchema(jrdd.rdd()) From 906d1800517c8231138577cd285c8f2df6372ad3 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 23:36:53 -0700 Subject: [PATCH 18/49] added todo explaining cost of creating Row object in python --- python/pyspark/rdd.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 16c7a3ba4922..c090f970be40 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1419,6 +1419,9 @@ def registerAsTable(self, name): def toPython(self): jrdd = self._jschema_rdd.javaToPython() + # TODO: This is inefficient, we should construct the Python Row object + # in Java land in the javaToPython function. May require a custom + # pickle serializer in Pyrolite return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) def _test(): From e9f5b8d3925b76b37485b42d1242065d89d856c4 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 12:01:06 -0700 Subject: [PATCH 19/49] adding tests --- python/pyspark/context.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index f30ebb9c8e7d..6949a98f9bfd 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -464,20 +464,49 @@ def sparkUser(self): return self._jsc.sc().sparkUser() class SQLContext: + """ + Main entry point for SparkSQL functionality. A SQLContext can be used create L{SchemaRDD}s, + register L{SchemaRDD}s as tables, execute sql over tables, cache tables, and read parquet files. + """ def __init__(self, sparkContext): + """ + Create a new SQLContext. + + @param sparkContext: The SparkContext to wrap. + + >>> from pyspark.context import SQLContext + >>> sqlCtx = SQLContext(sc) + """ self._sc = sparkContext self._jsc = self._sc._jsc self._jvm = self._sc._jvm self._ssql_ctx = self._jvm.SQLContext(self._jsc.sc()) + def parquetFile(path): + jschema_rdd = self._ssql_ctx.parquetFile(path) + return SchemaRDD(jschema_rdd, self) + + def registerRDDAsTable(rdd, tableName): + jschema_rdd = rdd._jschema_rdd + self._ssql_ctx.registerRDDAsTable(jschema_rdd, tableName) + def sql(self, sqlQuery): return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) + def table(tableName): + return SchemaRDD(self._ssql_ctx.table(tableName), self) + + def cacheTable(tableName): + self._ssql_ctx.cacheTable(tableName) + + def uncacheTable(tableName): + self._ssql_ctx.uncacheTable(tableName) + def applySchema(self, rdd): if (rdd.__class__ is SchemaRDD): raise Exception("Cannot apply schema to %s" % SchemaRDD.__name__) - elif type(rdd.first()) is not dict: + elif isinstance(rdd.first(), dict) is not dict: raise Exception("Only RDDs with dictionaries can be converted to %s" % SchemaRDD.__name__) jrdd = self._sc._pythonToJavaMap(rdd._jrdd) From d26ec5e8eab83efdc0a73e4ef3fceaa0e2d347a1 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 12:32:12 -0700 Subject: [PATCH 20/49] added test --- python/pyspark/context.py | 67 ++++++++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6949a98f9bfd..aff14344ae37 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -475,23 +475,71 @@ def __init__(self, sparkContext): @param sparkContext: The SparkContext to wrap. + # SQLContext >>> from pyspark.context import SQLContext >>> sqlCtx = SQLContext(sc) + + >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, + ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + + # applySchema + >>> srdd = sqlCtx.applySchema(rdd) + + >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}] + True + + # registerRDDAsTable + >>> sqlCtx.registerRDDAsTable(srdd, "table1") + + # sql + >>> srdd2 = sqlCtx.sql("select field1 as f1, field2 as f2 from table1") + >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}] + True + """ self._sc = sparkContext self._jsc = self._sc._jsc self._jvm = self._sc._jvm self._ssql_ctx = self._jvm.SQLContext(self._jsc.sc()) + def applySchema(self, rdd): + """ + Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to + determine the fields names and types, and then use that to extract all the dictionaries. + + # >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL + # Traceback (most recent call last): + # ... + # ValueError:... + """ + if (rdd.__class__ is SchemaRDD): + raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__) + elif not isinstance(rdd.first(), dict): + raise ValueError("Only RDDs with dictionaries can be converted to %s: %s" % + (SchemaRDD.__name__, rdd.first().__class__.__name)) + + jrdd = self._sc._pythonToJavaMap(rdd._jrdd) + srdd = self._ssql_ctx.applySchema(jrdd.rdd()) + return SchemaRDD(srdd, self) + + def registerRDDAsTable(self, rdd, tableName): + """ + + """ + if (rdd.__class__ is SchemaRDD): + jschema_rdd = rdd._jschema_rdd + self._ssql_ctx.registerRDDAsTable(jschema_rdd, tableName) + else: + raise ValueError("Can only register SchemaRDD as table") + def parquetFile(path): jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) - def registerRDDAsTable(rdd, tableName): - jschema_rdd = rdd._jschema_rdd - self._ssql_ctx.registerRDDAsTable(jschema_rdd, tableName) - def sql(self, sqlQuery): + """ + Run a sql query over a registered table, and return a L{SchemaRDD} with the results. + """ return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def table(tableName): @@ -503,17 +551,6 @@ def cacheTable(tableName): def uncacheTable(tableName): self._ssql_ctx.uncacheTable(tableName) - def applySchema(self, rdd): - if (rdd.__class__ is SchemaRDD): - raise Exception("Cannot apply schema to %s" % SchemaRDD.__name__) - elif isinstance(rdd.first(), dict) is not dict: - raise Exception("Only RDDs with dictionaries can be converted to %s" % SchemaRDD.__name__) - - jrdd = self._sc._pythonToJavaMap(rdd._jrdd) - srdd = self._ssql_ctx.applySchema(jrdd.rdd()) - return SchemaRDD(srdd, self) - - def _test(): import atexit import doctest From 7515ba0a3af8023e3f50344e8acd264fbedc4782 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 12:52:40 -0700 Subject: [PATCH 21/49] added more tests :) --- python/pyspark/context.py | 45 ++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index aff14344ae37..b79600416b61 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -488,14 +488,31 @@ def __init__(self, sparkContext): >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}] True + >>> sqlCtx.applySchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError:... + + >>> bad_rdd = sc.parallelize([1,2,3]) + >>> sqlCtx.applySchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError:... + # registerRDDAsTable >>> sqlCtx.registerRDDAsTable(srdd, "table1") # sql - >>> srdd2 = sqlCtx.sql("select field1 as f1, field2 as f2 from table1") + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}] True + # table + #>>> sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + #>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (1, "one")') + #>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (2, "two")') + #>>> srdd3 = sqlCtx.table("src") + #>>> srdd3.collect() == [{"key" : 1, "value" : "one"}, {"key" : 2, "value": "two"}] """ self._sc = sparkContext self._jsc = self._sc._jsc @@ -506,17 +523,12 @@ def applySchema(self, rdd): """ Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to determine the fields names and types, and then use that to extract all the dictionaries. - - # >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL - # Traceback (most recent call last): - # ... - # ValueError:... """ if (rdd.__class__ is SchemaRDD): raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__) elif not isinstance(rdd.first(), dict): raise ValueError("Only RDDs with dictionaries can be converted to %s: %s" % - (SchemaRDD.__name__, rdd.first().__class__.__name)) + (SchemaRDD.__name__, rdd.first())) jrdd = self._sc._pythonToJavaMap(rdd._jrdd) srdd = self._ssql_ctx.applySchema(jrdd.rdd()) @@ -524,7 +536,8 @@ def applySchema(self, rdd): def registerRDDAsTable(self, rdd, tableName): """ - + Registers the given RDD as a temporary table in the catalog. Temporary tables exist only + during the lifetime of this instance of SQLContext. """ if (rdd.__class__ is SchemaRDD): jschema_rdd = rdd._jschema_rdd @@ -533,22 +546,34 @@ def registerRDDAsTable(self, rdd, tableName): raise ValueError("Can only register SchemaRDD as table") def parquetFile(path): + """ + Loads a Parquet file, returning the result as a L{SchemaRDD}. + """ jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) def sql(self, sqlQuery): """ - Run a sql query over a registered table, and return a L{SchemaRDD} with the results. + Executes a SQL query using Spark, returning the result as a L{SchemaRDD}. """ return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) - def table(tableName): + def table(self, tableName): + """ + Returns the specified table as a L{SchemaRDD}. + """ return SchemaRDD(self._ssql_ctx.table(tableName), self) def cacheTable(tableName): + """ + Caches the specified table in-memory. + """ self._ssql_ctx.cacheTable(tableName) def uncacheTable(tableName): + """ + Removes the specified table from the in-memory cache. + """ self._ssql_ctx.uncacheTable(tableName) def _test(): From 79f739d11bfbae866ce3420f0ea05e63b5ba809e Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 13:20:08 -0700 Subject: [PATCH 22/49] added more tests --- python/pyspark/context.py | 66 ++++++++++++++++++++++++++------------- python/pyspark/rdd.py | 31 ++++++++++++++++++ 2 files changed, 75 insertions(+), 22 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index b79600416b61..092364a8f7fe 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -475,19 +475,13 @@ def __init__(self, sparkContext): @param sparkContext: The SparkContext to wrap. - # SQLContext >>> from pyspark.context import SQLContext >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - # applySchema >>> srdd = sqlCtx.applySchema(rdd) - - >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}] - True - >>> sqlCtx.applySchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... @@ -498,21 +492,6 @@ def __init__(self, sparkContext): Traceback (most recent call last): ... ValueError:... - - # registerRDDAsTable - >>> sqlCtx.registerRDDAsTable(srdd, "table1") - - # sql - >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") - >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}] - True - - # table - #>>> sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") - #>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (1, "one")') - #>>> sqlCtx.sql('INSERT INTO src (key, value) VALUES (2, "two")') - #>>> srdd3 = sqlCtx.table("src") - #>>> srdd3.collect() == [{"key" : 1, "value" : "one"}, {"key" : 2, "value": "two"}] """ self._sc = sparkContext self._jsc = self._sc._jsc @@ -523,6 +502,14 @@ def applySchema(self, rdd): """ Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to determine the fields names and types, and then use that to extract all the dictionaries. + + >>> from pyspark.context import SQLContext + >>> sqlCtx = SQLContext(sc) + >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, + ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}] + True """ if (rdd.__class__ is SchemaRDD): raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__) @@ -538,6 +525,12 @@ def registerRDDAsTable(self, rdd, tableName): """ Registers the given RDD as a temporary table in the catalog. Temporary tables exist only during the lifetime of this instance of SQLContext. + >>> from pyspark.context import SQLContext + >>> sqlCtx = SQLContext(sc) + >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, + ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + >>> srdd = sqlCtx.applySchema(rdd) + >>> sqlCtx.registerRDDAsTable(srdd, "table1") """ if (rdd.__class__ is SchemaRDD): jschema_rdd = rdd._jschema_rdd @@ -545,9 +538,19 @@ def registerRDDAsTable(self, rdd, tableName): else: raise ValueError("Can only register SchemaRDD as table") - def parquetFile(path): + def parquetFile(self, path): """ Loads a Parquet file, returning the result as a L{SchemaRDD}. + + >>> from pyspark.context import SQLContext + >>> sqlCtx = SQLContext(sc) + >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, + ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd.saveAsParquetFile("/tmp/tmp.parquet") + >>> srdd2 = sqlCtx.parquetFile("/tmp/tmp.parquet") + >>> srdd.collect() == srdd2.collect() + True """ jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) @@ -555,12 +558,31 @@ def parquetFile(path): def sql(self, sqlQuery): """ Executes a SQL query using Spark, returning the result as a L{SchemaRDD}. + + >>> from pyspark.context import SQLContext + >>> sqlCtx = SQLContext(sc) + >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, + ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + >>> srdd = sqlCtx.applySchema(rdd) + >>> sqlCtx.registerRDDAsTable(srdd, "table1") + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") + >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}] + True """ return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def table(self, tableName): """ Returns the specified table as a L{SchemaRDD}. + >>> from pyspark.context import SQLContext + >>> sqlCtx = SQLContext(sc) + >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, + ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + >>> srdd = sqlCtx.applySchema(rdd) + >>> sqlCtx.registerRDDAsTable(srdd, "table1") + >>> srdd2 = sqlCtx.table("table1") + >>> srdd.collect() == srdd2.collect() + True """ return SchemaRDD(self._ssql_ctx.table(tableName), self) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c090f970be40..140007ade498 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1414,7 +1414,38 @@ def _jrdd(self): def _id(self): return self._jrdd.id() + def saveAsParquetFile(self, path): + """ + Saves the contents of this L{SchemaRDD} as a parquet file, preserving the schema. Files + that are written out using this method can be read back in as a SchemaRDD using the + L{SQLContext.parquetFile} method. + + >>> from pyspark.context import SQLContext + >>> sqlCtx = SQLContext(sc) + >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, + ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd.saveAsParquetFile("/tmp/test.parquet") + >>> srdd2 = sqlCtx.parquetFile("/tmp/test.parquet") + >>> srdd2.collect() == srdd.collect() + True + """ + self._jschema_rdd.saveAsParquetFile(path) + def registerAsTable(self, name): + """ + Registers this RDD as a temporary table using the given name. The lifetime of this temporary + table is tied to the L{SQLContext} that was used to create this SchemaRDD. + >>> from pyspark.context import SQLContext + >>> sqlCtx = SQLContext(sc) + >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, + ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd.registerAsTable("test") + >>> srdd2 = sqlCtx.sql("select * from test") + >>> srdd.collect() == srdd2.collect() + True + """ self._jschema_rdd.registerAsTable(name) def toPython(self): From e4d21b44d743bffb07236d42ce649aed4f2be98f Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 15:29:55 -0700 Subject: [PATCH 23/49] Added pyrolite dependency --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 +- project/SparkBuild.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 11ab81f1498b..54864f656cf2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -208,7 +208,7 @@ private object SpecialLengths { val TIMING_DATA = -3 } -object PythonRDD { +private[spark] object PythonRDD { val UTF8 = Charset.forName("UTF-8") def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 0685ff76abcc..b2c3941e58f3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -346,7 +346,7 @@ object SparkBuild extends Build { "com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm), "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), "com.clearspring.analytics" % "stream" % "2.5.1", - "net.razorvine" % "pyrolite_2.10" % "1.1" + "org.spark-project" % "pyrolite" % "2.0" ), libraryDependencies ++= maybeAvro ) From 20936a57a2d7225ada7466f04af9618e81ad0e88 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 15:43:32 -0700 Subject: [PATCH 24/49] Added tests and documentation --- project/SparkBuild.scala | 1 - python/pyspark/rdd.py | 21 +++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b2c3941e58f3..01d113b56e80 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -522,7 +522,6 @@ object SparkBuild extends Build { def extraAssemblySettings() = Seq( test in assembly := {}, - assemblyOption in assembly ~= { _.copy(cacheOutput = false) }, mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 140007ade498..f863c4f19264 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1388,6 +1388,15 @@ def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) class Row(dict): + """ + An extended L{dict} that takes a L{dict} in its constructor, and exposes those items as fields. + + >>> r = Row({"hello" : "world", "foo" : "bar"}) + >>> r.hello + 'world' + >>> r.foo + 'bar' + """ def __init__(self, d): d.update(self.__dict__) @@ -1395,6 +1404,14 @@ def __init__(self, d): dict.__init__(self, d) class SchemaRDD(RDD): + """ + An RDD of Row objects that has an associated schema. The underlying JVM object is a SchemaRDD, + not a PythonRDD, so we can utilize the relational query api exposed by SparkSQL. + + For normal L{RDD} operations (map, count, etc.) the L{SchemaRDD} is not operated on directly, as + it's underlying implementation is a RDD composed of Java objects. Instead it is converted to a + PythonRDD in the JVM, on which Python operations can be done. + """ def __init__(self, jschema_rdd, sql_ctx): self.sql_ctx = sql_ctx @@ -1408,6 +1425,10 @@ def __init__(self, jschema_rdd, sql_ctx): @property def _jrdd(self): + """ + Lazy evaluation of PythonRDD object. Only done when a user calls methods defined by the + L{RDD} super class (map, count, etc.). + """ return self.toPython()._jrdd @property From b406ba0bdd29f4704ab75d009956348cf35cd9a5 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 15:48:30 -0700 Subject: [PATCH 25/49] doctest formatting --- python/pyspark/context.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 092364a8f7fe..3c0181ac23cb 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -508,7 +508,8 @@ def applySchema(self, rdd): >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) >>> srdd = sqlCtx.applySchema(rdd) - >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}] + >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, + ... {"field1" : 3, "field2": "row3"}] True """ if (rdd.__class__ is SchemaRDD): @@ -566,7 +567,8 @@ def sql(self, sqlQuery): >>> srdd = sqlCtx.applySchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") - >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}] + >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, + ... {"f1" : 3, "f2": "row3"}] True """ return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) From 79621cfd7870c52c2ad3341b4f87228f91a610d3 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 15:53:04 -0700 Subject: [PATCH 26/49] cleaning up cruft --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 1 - .../scala/org/apache/spark/sql/api/java/JavaSQLContext.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index fdf28822c59d..475a126f2dbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.columnar.InMemoryColumnarTableScan import org.apache.spark.sql.execution._ -import org.apache.spark.api.java.JavaRDD /** * :: AlphaComponent :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 4ca4505fbfc5..573345e42c43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.api.java import java.beans.{Introspector, PropertyDescriptor} -import java.util.{Map => JMap} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql.SQLContext @@ -83,6 +82,7 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) } + /** * Loads a parquet file, returning the result as a [[JavaSchemaRDD]]. */ From f98a42287b9a7b0bfbe0a54f34f328f0458de205 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 18:01:31 -0700 Subject: [PATCH 27/49] HiveContexts --- python/pyspark/context.py | 41 +++++++++++++++++++++++++++++++++- python/pyspark/java_gateway.py | 3 +++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 3c0181ac23cb..714a817255e5 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -496,7 +496,16 @@ def __init__(self, sparkContext): self._sc = sparkContext self._jsc = self._sc._jsc self._jvm = self._sc._jvm - self._ssql_ctx = self._jvm.SQLContext(self._jsc.sc()) + + @property + def _ssql_ctx(self): + """ + Accessor for the JVM SparkSQL context. Subclasses can overrite this property to provide + their own JVM Contexts. + """ + if not hasattr(self, '_scala_SQLContext'): + self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc()) + return self._scala_SQLContext def applySchema(self, rdd): """ @@ -600,6 +609,36 @@ def uncacheTable(tableName): """ self._ssql_ctx.uncacheTable(tableName) +class HiveContext(SQLContext): + + @property + def _ssql_ctx(self): + if not hasattr(self, '_scala_HiveContext'): + self._scala_HiveContext = self._jvm.HiveContext(self._jsc.sc()) + return self._scala_HiveContext + + def hiveql(self, hqlQuery): + return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery), self) + + def hql(self, hqlQuery): + return self.hiveql(hqlQuery) + +class LocalHiveContext(HiveContext): + + @property + def _ssql_ctx(self): + if not hasattr(self, '_scala_LocalHiveContext'): + self._scala_LocalHiveContext = self._jvm.LocalHiveContext(self._jsc.sc()) + return self._scala_LocalHiveContext + +class TestHiveContext(HiveContext): + + @property + def _ssql_ctx(self): + if not hasattr(self, '_scala_TestHiveContext'): + self._scala_TestHiveContext = self._jvm.TestHiveContext(self._jsc.sc()) + return self._scala_TestHiveContext + def _test(): import atexit import doctest diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index d8dd2a65225e..6bb6c877c942 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -65,5 +65,8 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") java_import(gateway.jvm, "scala.Tuple2") return gateway From b0192d35dffbd6eb4da9b4ba6fa219e70a9b3172 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Tue, 8 Apr 2014 19:08:36 -0700 Subject: [PATCH 28/49] Added Long, Double and Boolean as usable types + unit test --- python/pyspark/context.py | 7 +++++++ .../scala/org/apache/spark/sql/SQLContext.scala | 13 ++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 714a817255e5..ff52f1b9f84b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -492,6 +492,13 @@ def __init__(self, sparkContext): Traceback (most recent call last): ... ValueError:... + + >>> allTypes = sc.parallelize([{"int" : 1, "string" : "string", "double" : 1.0, "long": 1L, + ... "boolean" : True}]) + >>> srdd = sqlCtx.applySchema(allTypes).map(lambda x: (x.int, x.string, x.double, x.long, + ... x.boolean)) + >>> srdd.collect()[0] + (1, u'string', 1.0, 1, True) """ self._sc = sparkContext self._jsc = self._sc._jsc diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 475a126f2dbe..8fa24c0c5259 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -243,18 +243,17 @@ class SQLContext(@transient val sparkContext: SparkContext) def debugExec() = DebugQuery(executedPlan).execute().collect() } + // TODO: We only support primitive types, add support for nested types. Difficult because java + // objects don't have classTags def applySchema(rdd: RDD[Map[String, _]]): SchemaRDD = { val schema = rdd.first.map { case (fieldName, obj) => val dataType = obj.getClass match { case c: Class[_] if c == classOf[java.lang.String] => StringType case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType - // case c: Class[_] if c == java.lang.Short.TYPE => ShortType - // case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType - // case c: Class[_] if c == java.lang.Long.TYPE => LongType - // case c: Class[_] if c == java.lang.Double.TYPE => DoubleType - // case c: Class[_] if c == java.lang.Byte.TYPE => ByteType - // case c: Class[_] if c == java.lang.Float.TYPE => FloatType - // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType + case c: Class[_] if c == classOf[java.lang.Long] => LongType + case c: Class[_] if c == classOf[java.lang.Double] => DoubleType + case c: Class[_] if c == classOf[java.lang.Boolean] => BooleanType + case c => throw new Exception(s"Object of type $c cannot be used") } AttributeReference(fieldName, dataType, true)() }.toSeq From e00980f77e2f280d39b769e82a653ee61fa0a447 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 9 Apr 2014 00:26:42 -0700 Subject: [PATCH 29/49] First draft of python sql programming guide. --- docs/sql-programming-guide.md | 96 +++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 4 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index a59393e1424d..12ff7fd993ca 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -20,7 +20,7 @@ a schema that describes the data types of each column in the row. A SchemaRDD i in a traditional relational database. A SchemaRDD can be created from an existing RDD, parquet file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). -**All of the examples on this page use sample data included in the Spark distribution and can be run in the spark-shell.** +**All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.** @@ -33,6 +33,19 @@ a schema that describes the data types of each column in the row. A JavaSchemaR in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, parquet file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). + +
+ +Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using +Spark. At the core of this component is a new type of RDD, +[SchemaRDD](). SchemaRDDs are composed +[Row]() objects along with +a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table +in a traditional relational database. A SchemaRDD can be created from an existing RDD, parquet +file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). + +**All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.** +
*************************************************************************************************** @@ -44,7 +57,7 @@ file, or by running HiveQL against data stored in [Apache Hive](http://hive.apac The entry point into all relational functionality in Spark is the [SQLContext](api/sql/core/index.html#org.apache.spark.sql.SQLContext) class, or one of its -decendents. To create a basic SQLContext, all you need is a SparkContext. +descendants. To create a basic SQLContext, all you need is a SparkContext. {% highlight scala %} val sc: SparkContext // An existing SparkContext. @@ -60,7 +73,7 @@ import sqlContext._ The entry point into all relational functionality in Spark is the [JavaSQLContext](api/sql/core/index.html#org.apache.spark.sql.api.java.JavaSQLContext) class, or one -of its decendents. To create a basic JavaSQLContext, all you need is a JavaSparkContext. +of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext. {% highlight java %} JavaSparkContext ctx = ...; // An existing JavaSparkContext. @@ -69,6 +82,19 @@ JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx); +
+ +The entry point into all relational functionality in Spark is the +[SQLContext]() class, or one +of its decedents. To create a basic SQLContext, all you need is a SparkContext. + +{% highlight python %} +from pyspark.context import SQLContext +sqlCtx = SQLContext(sc) +{% endhighlight %} + +
+ ## Running SQL on RDDs @@ -81,7 +107,7 @@ One type of table that is supported by Spark SQL is an RDD of Scala case classes defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a SchemaRDD and then be -registered as a table. Tables can used in subsequent SQL statements. +registered as a table. Tables can be used in subsequent SQL statements. {% highlight scala %} val sqlContext = new org.apache.spark.sql.SQLContext(sc) @@ -176,6 +202,27 @@ List teenagerNames = teenagers.map(new Function() { +
+ +One type of table that is supported by Spark SQL is an RDD of dictionaries. The keys of the +dictionary define the columns names of the table, and the types are inferred by looking at the first +row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as a table. Tables +can be used in subsequent SQL statements. + +{% highlight python %} +lines = sc.textFile("examples/src/main/resources/people.txt") +parts = lines.map(lambda l: l.split(",")) +people = parts.map(lambda p: {"name": p[0], "age": int(p[1])}) + +peopleTable = sqlCtx.inferSchema(people) +peopleTable.registerAsTable("people") + +teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +teenNames = teenagers.map(lambda p: "Name: " + p.name) +{% endhighlight %} + +
+ **Note that Spark SQL currently uses a very basic SQL parser.** @@ -231,6 +278,27 @@ parquetFile.registerAsTable("parquetFile"); JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); +{% endhighlight %} + + + +
+ +{% highlight python %} + +peopleTable # The SchemaRDD from the previous example. + +# JavaSchemaRDDs can be saved as parquet files, maintaining the schema information. +peopleTable.saveAsParquetFile("people.parquet") + +// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. +// The result of loading a parquet file is also a JavaSchemaRDD. +parquetFile = sqlCtx.parquetFile("people.parquet") + +//Parquet files can also be registered as tables and then used in SQL statements. +parquetFile.registerAsTable("parquetFile"); +teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") + {% endhighlight %}
@@ -318,4 +386,24 @@ Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect(); +
+ +When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and +adds support for finding tables in in the MetaStore and writing queries using HiveQL. In addition to +the `sql` method a `HiveContext` also provides an `hql` methods, which allows queries to be +expressed in HiveQL. + +{% highlight python %} + +from pyspark.context import HiveContext +hiveCtx = HiveContext(sqlCtx) + +hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") + +// Queries are expressed in HiveQL. +results = hiveCtx.hql("FROM src SELECT key, value").collect() + +{% endhighlight %} +
From 183694468eb5f956f0de51ffd1b2203f33518ad5 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 9 Apr 2014 00:46:31 -0700 Subject: [PATCH 30/49] Fix comments. --- docs/sql-programming-guide.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 12ff7fd993ca..7b920167227d 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -210,14 +210,19 @@ row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as can be used in subsequent SQL statements. {% highlight python %} +# Load a text file and convert each line to a dictionary. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: {"name": p[0], "age": int(p[1])}) +# Infer the schema, and register the SchemaRDD as a table. peopleTable = sqlCtx.inferSchema(people) peopleTable.registerAsTable("people") +# SQL can be run over SchemaRDDs that have been registered as a table. teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +# The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) {% endhighlight %} @@ -291,11 +296,11 @@ peopleTable # The SchemaRDD from the previous example. # JavaSchemaRDDs can be saved as parquet files, maintaining the schema information. peopleTable.saveAsParquetFile("people.parquet") -// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. -// The result of loading a parquet file is also a JavaSchemaRDD. +# Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. +# The result of loading a parquet file is also a JavaSchemaRDD. parquetFile = sqlCtx.parquetFile("people.parquet") -//Parquet files can also be registered as tables and then used in SQL statements. +# Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") @@ -401,7 +406,7 @@ hiveCtx = HiveContext(sqlCtx) hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") -// Queries are expressed in HiveQL. +# Queries can be expressed in HiveQL. results = hiveCtx.hql("FROM src SELECT key, value").collect() {% endhighlight %} From 40491c9b85d5236bd97207a219495329524fee67 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Wed, 9 Apr 2014 10:44:19 -0700 Subject: [PATCH 31/49] PR Changes + Method Visibility --- .../apache/spark/api/python/PythonRDD.scala | 30 ++++++++----------- python/pyspark/context.py | 23 +++++++------- python/pyspark/rdd.py | 4 +-- .../org/apache/spark/sql/SQLContext.scala | 8 +++-- .../org/apache/spark/sql/SchemaRDD.scala | 7 +++-- 5 files changed, 35 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 54864f656cf2..f9d86fed34d0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -286,39 +286,33 @@ private[spark] object PythonRDD { file.close() } - def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = { - pyRDD.rdd.mapPartitions { iter => - val unpickle = new Unpickler - // TODO: Figure out why flatMap is necessay for pyspark - iter.flatMap { row => - unpickle.loads(row) match { - case objs: java.util.ArrayList[Any] => objs - // Incase the partition doesn't have a collection - case obj => Seq(obj) - } - } - } - } - + /** + * Convert an RDD of serialized Python dictionaries to Scala Maps + * TODO: Support more Python types. + */ def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = { pyRDD.rdd.mapPartitions { iter => val unpickle = new Unpickler // TODO: Figure out why flatMap is necessay for pyspark iter.flatMap { row => unpickle.loads(row) match { - case objs: java.util.ArrayList[JMap[String, _]] => objs.map(_.toMap) + case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap) // Incase the partition doesn't have a collection - case obj: JMap[String, _] => Seq(obj.toMap) + case obj: JMap[String @unchecked, _] => Seq(obj.toMap) } } } } + /** + * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by + * PySpark. + */ def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { jRDD.rdd.mapPartitions { iter => - val unpickle = new Pickler + val pickle = new Pickler iter.map { row => - unpickle.dumps(row) + pickle.dumps(row) } } } diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ff52f1b9f84b..6428fe5b63c7 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -174,7 +174,6 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile - SparkContext._pythonToJava = SparkContext._jvm.PythonRDD.pythonToJava SparkContext._pythonToJavaMap = SparkContext._jvm.PythonRDD.pythonToJavaMap SparkContext._javaToPython = SparkContext._jvm.PythonRDD.javaToPython @@ -481,21 +480,21 @@ def __init__(self, sparkContext): >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - >>> srdd = sqlCtx.applySchema(rdd) - >>> sqlCtx.applySchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL + >>> srdd = sqlCtx.inferSchema(rdd) + >>> sqlCtx.inferSchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ValueError:... >>> bad_rdd = sc.parallelize([1,2,3]) - >>> sqlCtx.applySchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL + >>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... ValueError:... >>> allTypes = sc.parallelize([{"int" : 1, "string" : "string", "double" : 1.0, "long": 1L, ... "boolean" : True}]) - >>> srdd = sqlCtx.applySchema(allTypes).map(lambda x: (x.int, x.string, x.double, x.long, + >>> srdd = sqlCtx.inferSchema(allTypes).map(lambda x: (x.int, x.string, x.double, x.long, ... x.boolean)) >>> srdd.collect()[0] (1, u'string', 1.0, 1, True) @@ -514,7 +513,7 @@ def _ssql_ctx(self): self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc()) return self._scala_SQLContext - def applySchema(self, rdd): + def inferSchema(self, rdd): """ Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to determine the fields names and types, and then use that to extract all the dictionaries. @@ -523,7 +522,7 @@ def applySchema(self, rdd): >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, ... {"field1" : 3, "field2": "row3"}] True @@ -535,7 +534,7 @@ def applySchema(self, rdd): (SchemaRDD.__name__, rdd.first())) jrdd = self._sc._pythonToJavaMap(rdd._jrdd) - srdd = self._ssql_ctx.applySchema(jrdd.rdd()) + srdd = self._ssql_ctx.inferSchema(jrdd.rdd()) return SchemaRDD(srdd, self) def registerRDDAsTable(self, rdd, tableName): @@ -546,7 +545,7 @@ def registerRDDAsTable(self, rdd, tableName): >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") """ if (rdd.__class__ is SchemaRDD): @@ -563,7 +562,7 @@ def parquetFile(self, path): >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.saveAsParquetFile("/tmp/tmp.parquet") >>> srdd2 = sqlCtx.parquetFile("/tmp/tmp.parquet") >>> srdd.collect() == srdd2.collect() @@ -580,7 +579,7 @@ def sql(self, sqlQuery): >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, @@ -596,7 +595,7 @@ def table(self, tableName): >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> srdd2 = sqlCtx.table("table1") >>> srdd.collect() == srdd2.collect() diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f863c4f19264..84963eb9d478 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1445,7 +1445,7 @@ def saveAsParquetFile(self, path): >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.saveAsParquetFile("/tmp/test.parquet") >>> srdd2 = sqlCtx.parquetFile("/tmp/test.parquet") >>> srdd2.collect() == srdd.collect() @@ -1461,7 +1461,7 @@ def registerAsTable(self, name): >>> sqlCtx = SQLContext(sc) >>> rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, ... {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) - >>> srdd = sqlCtx.applySchema(rdd) + >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.registerAsTable("test") >>> srdd2 = sqlCtx.sql("select * from test") >>> srdd.collect() == srdd2.collect() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 8fa24c0c5259..24d60ea07429 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -243,9 +243,11 @@ class SQLContext(@transient val sparkContext: SparkContext) def debugExec() = DebugQuery(executedPlan).execute().collect() } - // TODO: We only support primitive types, add support for nested types. Difficult because java - // objects don't have classTags - def applySchema(rdd: RDD[Map[String, _]]): SchemaRDD = { + /** + * Peek at the first row of the RDD and infer its schema. + * TODO: We only support primitive types, add support for nested types. + */ + private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = { val schema = rdd.first.map { case (fieldName, obj) => val dataType = obj.getClass match { case c: Class[_] if c == classOf[java.lang.String] => StringType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index c7fac0403453..a771147f9067 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import net.razorvine.pickle.{Pickler, Unpickler} +import net.razorvine.pickle.Pickler import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} import org.apache.spark.annotation.{AlphaComponent, Experimental} @@ -313,12 +313,15 @@ class SchemaRDD( /** FOR INTERNAL USE ONLY */ def analyze = sqlContext.analyzer(logicalPlan) - def javaToPython: JavaRDD[Array[Byte]] = { + private[sql] def javaToPython: JavaRDD[Array[Byte]] = { val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) this.mapPartitions { iter => val pickle = new Pickler iter.map { row => val map: JMap[String, Any] = new java.util.HashMap + // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict]. + // Ideally we should be able to pickle an object directly into a Python collection so we + // don't have to create an ArrayList every time. val arr: java.util.ArrayList[Any] = new java.util.ArrayList row.zip(fieldNames).foreach { case (obj, name) => map.put(name, obj) From 337b20170c10bf986eba5252eb7b0bdf3f0cbf3a Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Wed, 9 Apr 2014 17:31:45 -0700 Subject: [PATCH 32/49] Changed com.clearspring.analytics stream version from 2.4.0 to 2.5.1 to match SBT build, and added pyrolite to maven build --- pom.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5f66cbe76859..c84829dcf23b 100644 --- a/pom.xml +++ b/pom.xml @@ -262,7 +262,12 @@ com.clearspring.analytics stream - 2.4.0 + 2.5.1 + + + org.spark-project + pyrolite + 2.0