Skip to content

Commit 7027634

Browse files
committed
Java API.
1 parent cff84cc commit 7027634

2 files changed

Lines changed: 63 additions & 1 deletion

File tree

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
2323

2424
import org.apache.spark.annotation.Experimental
2525
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
26+
import org.apache.spark.sql.json.JsonRDD
2627
import org.apache.spark.sql.SQLContext
2728
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
2829
import org.apache.spark.sql.catalyst.types._
@@ -100,13 +101,28 @@ class JavaSQLContext(val sqlContext: SQLContext) {
100101
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
101102
}
102103

103-
104104
/**
105105
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
106106
*/
107107
def parquetFile(path: String): JavaSchemaRDD =
108108
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
109109

110+
/**
111+
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
112+
*
113+
* @group userf
114+
*/
115+
def jsonFile(path: String): JavaSchemaRDD =
116+
jsonRDD(sqlContext.sparkContext.textFile(path))
117+
118+
/**
119+
* Loads a RDD[String] storing JSON objects (one object per record), returning the result as a
120+
* [[JavaSchemaRDD]].
121+
*
122+
* @group userf
123+
*/
124+
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
125+
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
110126

111127
/**
112128
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only

sql/core/src/test/scala/org/apache/spark/sql/api/java/JavaSQLSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.sql.api.java
1919

20+
2021
import scala.beans.BeanProperty
2122

2223
import org.scalatest.FunSuite
2324

2425
import org.apache.spark.api.java.JavaSparkContext
26+
import org.apache.spark.sql.catalyst.util._
2527
import org.apache.spark.sql.test.TestSQLContext
2628

2729
// Implicits
@@ -111,4 +113,48 @@ class JavaSQLSuite extends FunSuite {
111113
""".stripMargin).collect.head.row ===
112114
Seq.fill(8)(null))
113115
}
116+
117+
test("loads JSON datasets") {
118+
val jsonString =
119+
"""{"string":"this is a simple string.",
120+
"integer":10,
121+
"long":21474836470,
122+
"bigInteger":92233720368547758070,
123+
"double":1.7976931348623157E308,
124+
"boolean":true,
125+
"null":null
126+
}""".replaceAll("\n", " ")
127+
val rdd = javaCtx.parallelize(jsonString :: Nil)
128+
129+
var schemaRDD = javaSqlCtx.jsonRDD(rdd)
130+
131+
schemaRDD.registerAsTable("jsonTable1")
132+
133+
assert(
134+
javaSqlCtx.sql("select * from jsonTable1").collect.head.row ===
135+
Seq(BigDecimal("92233720368547758070"),
136+
true,
137+
1.7976931348623157E308,
138+
10,
139+
21474836470L,
140+
null,
141+
"this is a simple string."))
142+
143+
val file = getTempFilePath("json")
144+
val path = file.toString
145+
rdd.saveAsTextFile(path)
146+
schemaRDD = javaSqlCtx.jsonFile(path)
147+
148+
schemaRDD.registerAsTable("jsonTable2")
149+
150+
assert(
151+
javaSqlCtx.sql("select * from jsonTable2").collect.head.row ===
152+
Seq(BigDecimal("92233720368547758070"),
153+
true,
154+
1.7976931348623157E308,
155+
10,
156+
21474836470L,
157+
null,
158+
"this is a simple string."))
159+
}
114160
}

0 commit comments

Comments
 (0)