-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24391][SQL] Support arrays of any types by from_json #21439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
3a7559b
b601a93
02a97ac
f34bd88
f062dd2
86d2f20
9d0230a
181dcae
6d54cf0
e321e37
1c657e8
b5b0d9c
ce9918b
fced8ec
0fd0fb9
e49ee9d
2bca7e0
f3efb1b
82d4fd5
758d1df
8349ca8
2746d35
bc3a2dd
39a0a4e
9c2681a
021350b
89719c0
bdfd8a1
74a7799
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -544,34 +544,27 @@ case class JsonToStructs( | |
| timeZoneId = None) | ||
|
|
||
| override def checkInputDataTypes(): TypeCheckResult = nullableSchema match { | ||
| case _: StructType | ArrayType(_: StructType, _) | _: MapType => | ||
| case _: StructType | _: ArrayType | _: MapType => | ||
| super.checkInputDataTypes() | ||
| case _ => TypeCheckResult.TypeCheckFailure( | ||
| s"Input schema ${nullableSchema.catalogString} must be a struct or an array of structs.") | ||
|
||
| } | ||
|
|
||
| @transient | ||
| lazy val rowSchema = nullableSchema match { | ||
| case st: StructType => st | ||
| case ArrayType(st: StructType, _) => st | ||
| case mt: MapType => mt | ||
| } | ||
|
|
||
| // This converts parsed rows to the desired output by the given schema. | ||
| @transient | ||
| lazy val converter = nullableSchema match { | ||
| case _: StructType => | ||
| (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null | ||
| case ArrayType(_: StructType, _) => | ||
| (rows: Seq[InternalRow]) => new GenericArrayData(rows) | ||
| case _: ArrayType => | ||
| (rows: Seq[InternalRow]) => rows.head.getArray(0) | ||
| case _: MapType => | ||
| (rows: Seq[InternalRow]) => rows.head.getMap(0) | ||
| } | ||
|
|
||
| @transient | ||
| lazy val parser = | ||
| new JacksonParser( | ||
| rowSchema, | ||
| nullableSchema, | ||
| new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) | ||
|
|
||
| override def dataType: DataType = nullableSchema | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ class JacksonParser( | |
| dt match { | ||
| case st: StructType => makeStructRootConverter(st) | ||
| case mt: MapType => makeMapRootConverter(mt) | ||
| case at: ArrayType => makeArrayRootConverter(at) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change accepts the json datasource form that the master can't parse? If so, I think we need tests in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Right, it accept arrays of any types comparing to the master which accepts arrays of structs only
I added a few tests to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You've already added tests in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I didn't catch you meant specific class. Just in case, what is the reason for adding tests to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, ok. You touched the the |
||
| } | ||
| } | ||
|
|
||
|
|
@@ -101,6 +102,17 @@ class JacksonParser( | |
| } | ||
| } | ||
|
|
||
| private def makeArrayRootConverter(at: ArrayType): JsonParser => Seq[InternalRow] = { | ||
| val elemConverter = makeConverter(at.elementType) | ||
| (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) { | ||
| case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter))) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In line 87: Should we also follow this?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code in line 87 returns In case when schema is |
||
| case START_OBJECT if at.elementType.isInstanceOf[StructType] => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this? This means that if the user asks for an array of data and the data doesn't contain an array we return an array with a single element. This seems wrong to me. I'd rather return null or throw an exception.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for backward compatibility to the behavior introduced by this PR: #16929, most likely by this: https://github.com/apache/spark/pull/16929/files#diff-6626026091295ad8c0dfb66ecbcd04b1R506 . Even special test was added: https://github.com/apache/spark/pull/16929/files#diff-88230f171af0b7a40791a867f9dd3a36R382 . Please, ask author @HyukjinKwon about reasons for the changes.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an array of data is empty, and the schema is an array, it should return an empty array, https://github.com/apache/spark/pull/16929/files#diff-88230f171af0b7a40791a867f9dd3a36R389 Returning empty object or array is not even introduced by me anyway in JSON datasource.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a super weird case...can we put this special handling code for back-compatibility in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what was done in that PR is pretty different and it is actually the opposite of what we are doing here. Indeed, there we are returning an array of structs when a struct is specified as schema and the JSON contains an array. Here we are returning an array with one struct when the schema is an array of struct and there is a struct instead of an array. Despite I don't really like the behavior introduced in the PR you mentioned, I can understand it, as it was a way to support array of struct (the only at the moment) and I don't think we can change it before 3.0 at least for backward compatibility. But since here we are introducing a new behavior, if an array is required and a struct is found, I think returning an array with one element is a wrong/unexpected behavior and returning null would be what I'd expect as a user.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can only say that this @maropu Initially I put the behavior under a flag in
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we add a comment on top of this
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am adding a comment for this. |
||
| val st = at.elementType.asInstanceOf[StructType] | ||
| val fieldConverters = st.map(_.dataType).map(makeConverter).toArray | ||
| Seq(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters))))) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Create a converter which converts the JSON documents held by the `JsonParser` | ||
| * to a value according to a desired schema. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -423,7 +423,9 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with | |
| val input = """{"a": 1}""" | ||
| val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil)) | ||
| val output = InternalRow(1) :: Nil | ||
| checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output) | ||
| checkEvaluation( | ||
| JsonToStructs(schema, Map.empty, Literal(input), gmtId), | ||
| output) | ||
|
||
| } | ||
|
|
||
| test("from_json - input=empty array, schema=array, output=empty array") { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,3 +39,8 @@ select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>'); | |
| -- infer schema of json literal | ||
| select schema_of_json('{"c1":0, "c2":[1]}'); | ||
| select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}')); | ||
|
|
||
| -- from_json - array type | ||
| select from_json('[1, 2, 3]', 'array<int>'); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add more cases ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
| select from_json('[{"a": 1}, {"a":2}]', 'array<struct<a:int>>'); | ||
| select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>'); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -136,12 +136,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { | |
| test("from_json invalid schema") { | ||
|
||
| val df = Seq("""{"a" 1}""").toDS() | ||
| val schema = ArrayType(StringType) | ||
| val message = intercept[AnalysisException] { | ||
| df.select(from_json($"value", schema)) | ||
| }.getMessage | ||
|
|
||
| assert(message.contains( | ||
| "Input schema array<string> must be a struct or an array of structs.")) | ||
| checkAnswer( | ||
| df.select(from_json($"value", schema)), | ||
| Seq(Row(null)) | ||
| ) | ||
| } | ||
|
|
||
| test("from_json array support") { | ||
|
|
@@ -405,4 +404,72 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { | |
|
|
||
| assert(out.schema == expected) | ||
| } | ||
|
|
||
| test("from_json - array of primitive types") { | ||
| val df = Seq("[1, 2, 3]").toDF("a") | ||
| val schema = new ArrayType(IntegerType, false) | ||
|
|
||
| checkAnswer(df.select(from_json($"a", schema)), Seq(Row(Array(1, 2, 3)))) | ||
| } | ||
|
|
||
| test("from_json - array of primitive types - malformed row") { | ||
| val df = Seq("[1, 2 3]").toDF("a") | ||
| val schema = new ArrayType(IntegerType, false) | ||
|
|
||
| checkAnswer(df.select(from_json($"a", schema)), Seq(Row(null))) | ||
| } | ||
|
|
||
| test("from_json - array of arrays") { | ||
| val jsonDF = Seq("[[1], [2, 3], [4, 5, 6]]").toDF("a") | ||
| val schema = new ArrayType(ArrayType(IntegerType, false), false) | ||
| jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") | ||
|
|
||
| checkAnswer( | ||
| sql("select json[0][0], json[1][1], json[2][2] from jsonTable"), | ||
| Seq(Row(1, 3, 6))) | ||
| } | ||
|
|
||
| test("from_json - array of arrays - malformed row") { | ||
| val jsonDF = Seq("[[1], [2, 3], 4, 5, 6]]").toDF("a") | ||
| val schema = new ArrayType(ArrayType(IntegerType, false), false) | ||
| jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") | ||
|
|
||
| checkAnswer(sql("select json[0] from jsonTable"), Seq(Row(null))) | ||
| } | ||
|
|
||
| test("from_json - array of structs") { | ||
| val jsonDF = Seq("""[{"a":1}, {"a":2}, {"a":3}]""").toDF("a") | ||
| val schema = new ArrayType(new StructType().add("a", IntegerType), false) | ||
| jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") | ||
|
|
||
| checkAnswer( | ||
| sql("select json[0], json[1], json[2] from jsonTable"), | ||
| Seq(Row(Row(1), Row(2), Row(3)))) | ||
| } | ||
|
|
||
| test("from_json - array of structs - malformed row") { | ||
| val jsonDF = Seq("""[{"a":1}, {"a:2}, {"a":3}]""").toDF("a") | ||
| val schema = new ArrayType(new StructType().add("a", IntegerType), false) | ||
| jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") | ||
|
|
||
| checkAnswer(sql("select json[0], json[1]from jsonTable"), Seq(Row(null, null))) | ||
| } | ||
|
|
||
| test("from_json - array of maps") { | ||
| val jsonDF = Seq("""[{"a":1}, {"b":2}]""").toDF("a") | ||
| val schema = new ArrayType(MapType(StringType, IntegerType, false), false) | ||
| jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") | ||
|
|
||
| checkAnswer( | ||
| sql("""select json[0], json[1] from jsonTable"""), | ||
| Seq(Row(Map("a" -> 1), Map("b" -> 2)))) | ||
| } | ||
|
|
||
| test("from_json - array of maps - malformed row") { | ||
| val jsonDF = Seq("""[{"a":1} "b":2}]""").toDF("a") | ||
| val schema = new ArrayType(MapType(StringType, IntegerType, false), false) | ||
| jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") | ||
|
|
||
| checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also update the comment of
JsonToStructs:Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s.