Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3a7559b
Support arrays by from_json
MaxGekk May 26, 2018
b601a93
Fix comments
MaxGekk May 26, 2018
02a97ac
Support array of struct unpacking for backward compatibility
MaxGekk May 26, 2018
f34bd88
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk May 27, 2018
f062dd2
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk May 31, 2018
86d2f20
Added case insensitive options for jsonToStruct
MaxGekk May 31, 2018
9d0230a
Making added values private
MaxGekk May 31, 2018
181dcae
Unnecessary check of input params is removed
MaxGekk Jun 11, 2018
6d54cf0
Added comment for the unpackArray config
MaxGekk Jun 11, 2018
e321e37
Added a test when schema is array type but json input is {}
MaxGekk Jun 11, 2018
1c657e8
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Jun 14, 2018
b5b0d9c
Making imports shorter
MaxGekk Jun 14, 2018
ce9918b
SQL tests for arrays
MaxGekk Jun 14, 2018
fced8ec
Enable unpackArray by default to keep backward compatibility
MaxGekk Jun 29, 2018
0fd0fb9
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Jul 7, 2018
e49ee9d
Updating of sql tests
MaxGekk Jul 7, 2018
2bca7e0
Fix python tests
MaxGekk Jul 7, 2018
f3efb1b
Merge branch 'from_json-array' of github.com:MaxGekk/spark-1 into fro…
MaxGekk Jul 7, 2018
82d4fd5
Merge branch 'master' into from_json-array
MaxGekk Jul 13, 2018
758d1df
Fix tests - removing unused parameter
MaxGekk Jul 13, 2018
8349ca8
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Jul 21, 2018
2746d35
Removing unpackArray option
MaxGekk Jul 21, 2018
bc3a2dd
Removing unused val
MaxGekk Jul 21, 2018
39a0a4e
Reverting unrelated changes
MaxGekk Jul 24, 2018
9c2681a
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Jul 28, 2018
021350b
Addressing Liang-Chi Hsieh's review comments
MaxGekk Jul 28, 2018
89719c0
Merge remote-tracking branch 'origin/master' into from_json-array
MaxGekk Aug 12, 2018
bdfd8a1
Added an example
MaxGekk Aug 12, 2018
74a7799
A few negative SQL tests
MaxGekk Aug 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2115,7 +2115,7 @@ def json_tuple(col, *fields):
def from_json(col, schema, options={}):
"""
Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType`
as keys type, :class:`StructType` or :class:`ArrayType` of :class:`StructType`\\s with
as keys type, :class:`StructType` or :class:`ArrayType` with
the specified schema. Returns `null`, in the case of an unparseable string.

:param col: string column in json format
Expand All @@ -2141,6 +2141,11 @@ def from_json(col, schema, options={}):
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=[Row(a=1)])]
>>> data = [(1, '''[1, 2, 3]''')]
>>> schema = ArrayType(IntegerType())
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=[1, 2, 3])]
"""

sc = SparkContext._active_spark_context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ case class JsonToStructs(
// can generate incorrect files if values are missing in columns declared as non-nullable.
val nullableSchema = if (forceNullableSchema) schema.asNullable else schema

val unpackArray: Boolean = options.get("unpackArray").map(_.toBoolean).getOrElse(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private? (This is not related to this pr though, nullableSchema also can be private?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the option unpackArray case-insensitive?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add this new option here, I feel we'd be better to document somewhere (e.g., sq/functions.scala)


override def nullable: Boolean = true

// Used in `FunctionRegistry`
Expand All @@ -548,7 +550,9 @@ case class JsonToStructs(
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
case _: StructType | ArrayType(_: StructType, _) | _: MapType =>
case ArrayType(_: StructType, _) if unpackArray =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if unpackArray is false, the next branch in line 558 still do super.checkInputDataTypes() for any ArrayType

super.checkInputDataTypes()
case _: StructType | _: ArrayType | _: MapType =>
super.checkInputDataTypes()
case _ => TypeCheckResult.TypeCheckFailure(
s"Input schema ${nullableSchema.simpleString} must be a struct or an array of structs.")
Expand All @@ -557,7 +561,8 @@ case class JsonToStructs(
@transient
lazy val rowSchema = nullableSchema match {
case st: StructType => st
case ArrayType(st: StructType, _) => st
case ArrayType(st: StructType, _) if unpackArray => st
case at: ArrayType => at
case mt: MapType => mt
}

Expand All @@ -566,8 +571,10 @@ case class JsonToStructs(
lazy val converter = nullableSchema match {
case _: StructType =>
(rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
case ArrayType(_: StructType, _) =>
case ArrayType(_: StructType, _) if unpackArray =>
(rows: Seq[InternalRow]) => new GenericArrayData(rows)
case _: ArrayType =>
(rows: Seq[InternalRow]) => rows.head.getArray(0)
case _: MapType =>
(rows: Seq[InternalRow]) => rows.head.getMap(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class JacksonParser(
dt match {
case st: StructType => makeStructRootConverter(st)
case mt: MapType => makeMapRootConverter(mt)
case at: ArrayType => makeArrayRootConverter(at)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change accepts the json datasource form that the master can't parse? If so, I think we need tests in JsonSuite, too. cc: @HyukjinKwon

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change accepts the json datasource form that the master can't parse?

Right, it accept arrays of any types comparing to the master which accepts arrays of structs only

If so, I think we need tests in JsonSuite ...

I added a few tests to JsonSuite or do you mean some concrete test case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've already added tests in JsonSuite? It seems there are tests in JsonFunctionsSuite, JsonExpressionSuite, and SQLQueryTestSuite now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't catch you meant specific class. Just in case, what is the reason for adding tests to JsonSuite? I changed behavior of a function, so, JsonFunctionsSuite is perfect place for new tests, I believe. At the moment, you cannot specify a schema of json objects different from StructType in DataFrameReader. In this way, the ArrayType can come into JacksonParser only from json functions (from_json). I can add similar test to JsonSuite as in JsonFunctionsSuite but it doesn't make any sense from my point of view.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, ok. You touched the the JacksonParser.scala file, so I though there were some behaivour changes in the json datasorce. But I notice that there are not, so the current tests are enough. Thanks.

}
}

Expand Down Expand Up @@ -101,6 +102,13 @@ class JacksonParser(
}
}

private def makeArrayRootConverter(at: ArrayType): JsonParser => Seq[InternalRow] = {
val elemConverter = makeConverter(at.elementType)
(parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) {
case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 87:

        val array = convertArray(parser, elementConverter)
        // Here, as we support reading top level JSON arrays and take every element
        // in such an array as a row, this case is possible.
        if (array.numElements() == 0) {
          Nil
        } else {
          array.toArray[InternalRow](schema).toSeq
        }

Should we also follow this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in line 87 returns null for json input [] if schema is StructType(StructField("a", IntegerType) :: Nil). I would explain why we should return null in that case: we extract struct from the array. If the array is empty, it means there is nothing to extract and we returns null for the nothing.

In case when schema is ArrayType(...), I believe we should return empty array for empty JSON array []

}
}

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val input = """{"a": 1}"""
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(1) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
checkEvaluation(
JsonToStructs(schema, Map("unpackArray" -> "true"), Literal(input), gmtId, true),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add case for unpackArray as false

output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks unrelated change.

}

test("from_json - input=empty array, schema=array, output=empty array") {
Expand All @@ -437,7 +439,9 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val input = "{ }"
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val output = InternalRow(null) :: Nil
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
checkEvaluation(
JsonToStructs(schema, Map("unpackArray" -> "true"), Literal(input), gmtId, true),
output)
}

test("from_json - input=array of single object, schema=struct, output=single row") {
Expand Down
10 changes: 5 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3243,7 +3243,7 @@ object functions {

/**
* (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* as keys type, `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand Down Expand Up @@ -3275,7 +3275,7 @@ object functions {

/**
* (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* as keys type, `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand Down Expand Up @@ -3304,7 +3304,7 @@ object functions {

/**
* Parses a column containing a JSON string into a `MapType` with `StringType` as keys type,
* `StructType` or `ArrayType` of `StructType`s with the specified schema.
* `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand All @@ -3318,7 +3318,7 @@ object functions {

/**
* (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* as keys type, `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand All @@ -3335,7 +3335,7 @@ object functions {

/**
* (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* as keys type, `StructType` or `ArrayType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
test("from_json invalid schema") {
Copy link
Member

@viirya viirya Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a invalid schema now. It is more like a mismatched schema.

val df = Seq("""{"a" 1}""").toDS()
val schema = ArrayType(StringType)
val message = intercept[AnalysisException] {
df.select(from_json($"value", schema))
}.getMessage

assert(message.contains(
"Input schema array<string> must be a struct or an array of structs."))
checkAnswer(
df.select(from_json($"value", schema)),
Seq(Row(null))
)
}

test("from_json array support") {
Expand Down Expand Up @@ -392,4 +391,72 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(Seq("""{"{"f": 1}": "a"}""").toDS().select(from_json($"value", schema)),
Row(null))
}

test("from_json - array of primitive types") {
val df = Seq("[1, 2, 3]").toDF("a")
val schema = new ArrayType(IntegerType, false)

checkAnswer(df.select(from_json($"a", schema)), Seq(Row(Array(1, 2, 3))))
}

test("from_json - array of primitive types - malformed row") {
val df = Seq("[1, 2 3]").toDF("a")
val schema = new ArrayType(IntegerType, false)

checkAnswer(df.select(from_json($"a", schema)), Seq(Row(null)))
}

test("from_json - array of arrays") {
val jsonDF = Seq("[[1], [2, 3], [4, 5, 6]]").toDF("a")
val schema = new ArrayType(ArrayType(IntegerType, false), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(
sql("select json[0][0], json[1][1], json[2][2] from jsonTable"),
Seq(Row(1, 3, 6)))
}

test("from_json - array of arrays - malformed row") {
val jsonDF = Seq("[[1], [2, 3], 4, 5, 6]]").toDF("a")
val schema = new ArrayType(ArrayType(IntegerType, false), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(sql("select json[0] from jsonTable"), Seq(Row(null)))
}

test("from_json - array of structs") {
val jsonDF = Seq("""[{"a":1}, {"a":2}, {"a":3}]""").toDF("a")
val schema = new ArrayType(new StructType().add("a", IntegerType), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(
sql("select json[0], json[1], json[2] from jsonTable"),
Seq(Row(Row(1), Row(2), Row(3))))
}

test("from_json - array of structs - malformed row") {
val jsonDF = Seq("""[{"a":1}, {"a:2}, {"a":3}]""").toDF("a")
val schema = new ArrayType(new StructType().add("a", IntegerType), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(sql("select json[0], json[1]from jsonTable"), Seq(Row(null, null)))
}

test("from_json - array of maps") {
val jsonDF = Seq("""[{"a":1}, {"b":2}]""").toDF("a")
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(
sql("""select json[0], json[1] from jsonTable"""),
Seq(Row(Map("a" -> 1), Map("b" -> 2))))
}

test("from_json - array of maps - malformed row") {
val jsonDF = Seq("""[{"a":1} "b":2}]""").toDF("a")
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)
jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable")

checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
}
}