|
15 | 15 | # limitations under the License. |
16 | 16 | # |
17 | 17 |
|
18 | | -from pyspark.rdd import RDD |
| 18 | +from pyspark.rdd import RDD, PipelinedRDD |
19 | 19 |
|
20 | 20 | from py4j.protocol import Py4JError |
21 | 21 |
|
@@ -123,6 +123,49 @@ def parquetFile(self, path): |
123 | 123 | jschema_rdd = self._ssql_ctx.parquetFile(path) |
124 | 124 | return SchemaRDD(jschema_rdd, self) |
125 | 125 |
|
| 126 | + |
| 127 | + def jsonFile(self, path): |
| 128 | + """Loads a text file storing one JSON object per line, |
| 129 | + returning the result as a L{SchemaRDD}. |
| 130 | +
|
| 131 | + >>> import tempfile, shutil |
| 132 | + >>> jsonFile = tempfile.mkdtemp() |
| 133 | + >>> shutil.rmtree(jsonFile) |
| 134 | + >>> ofn = open(jsonFile, 'w') |
| 135 | + >>> for json in jsonStrings: |
| 136 | + ... print>>ofn, json |
| 137 | + >>> ofn.close() |
| 138 | + >>> srdd = sqlCtx.jsonFile(jsonFile) |
| 139 | + >>> sqlCtx.registerRDDAsTable(srdd, "table1") |
| 140 | + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") |
| 141 | + >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, |
| 142 | + ... {"f1" : 3, "f2": "row3"}] |
| 143 | + True |
| 144 | + """ |
| 145 | + jschema_rdd = self._ssql_ctx.jsonFile(path) |
| 146 | + return SchemaRDD(jschema_rdd, self) |
| 147 | + |
| 148 | + def jsonRDD(self, rdd): |
| 149 | + """Loads a RDD storing one JSON object per string, returning the result as a L{SchemaRDD}. |
| 150 | +
|
| 151 | + >>> srdd = sqlCtx.jsonRDD(json) |
| 152 | + >>> sqlCtx.registerRDDAsTable(srdd, "table1") |
| 153 | + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") |
| 154 | + >>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, |
| 155 | + ... {"f1" : 3, "f2": "row3"}] |
| 156 | + True |
| 157 | + """ |
| 158 | + def func(split, iterator): |
| 159 | + for x in iterator: |
| 160 | + if not isinstance(x, basestring): |
| 161 | + x = unicode(x) |
| 162 | + yield x.encode("utf-8") |
| 163 | + keyed = PipelinedRDD(rdd, func) |
| 164 | + keyed._bypass_serializer = True |
| 165 | + jrdd = keyed._jrdd.map(self._jvm.BytesToString()) |
| 166 | + jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd()) |
| 167 | + return SchemaRDD(jschema_rdd, self) |
| 168 | + |
126 | 169 | def sql(self, sqlQuery): |
127 | 170 | """Return a L{SchemaRDD} representing the result of the given query. |
128 | 171 |
|
@@ -323,6 +366,14 @@ def saveAsTable(self, tableName): |
323 | 366 | """Creates a new table with the contents of this SchemaRDD.""" |
324 | 367 | self._jschema_rdd.saveAsTable(tableName) |
325 | 368 |
|
| 369 | + def getSchemaTreeString(self): |
| 370 | + """Returns the output schema in the tree format.""" |
| 371 | + self._jschema_rdd.getSchemaTreeString() |
| 372 | + |
| 373 | + def printSchema(self): |
| 374 | + """Prints out the schema in the tree format.""" |
| 375 | + print self.getSchemaTreeString() |
| 376 | + |
326 | 377 | def count(self): |
327 | 378 | """Return the number of elements in this RDD. |
328 | 379 |
|
@@ -420,6 +471,10 @@ def _test(): |
420 | 471 | globs['sqlCtx'] = SQLContext(sc) |
421 | 472 | globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"}, |
422 | 473 | {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) |
| 474 | + jsonStrings = ['{"field1": 1, "field2": "row1"}', |
| 475 | + '{"field1" : 2, "field2": "row2"}', '{"field1" : 3, "field2": "row3"}'] |
| 476 | + globs['jsonStrings'] = jsonStrings |
| 477 | + globs['json'] = sc.parallelize(jsonStrings) |
423 | 478 | (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) |
424 | 479 | globs['sc'].stop() |
425 | 480 | if failure_count: |
|
0 commit comments