From d83141494e1986fa1da50642db449a4f1a8e16c0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 11 Nov 2019 16:56:47 -0800 Subject: [PATCH 1/9] Add new "array of struct" test case to ParquetIOSuite --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 39590b063f0a..4c540eff88d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -84,6 +84,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple))) } + testStandardAndLegacyModes("array of struct") { + val data = (1 to 4).map(i => (i, Seq(Tuple1(i), Tuple1(i + 1)))) + val expected = (1 to 4).map(i => Row(i, Seq(Row(i), Row(i + 1)))) + withParquetDataFrame(data)(r => checkAnswer(r, expected)) + } + + test("basic data types (without binary)") { val data = (1 to 4).map { i => (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) From ed894b43c0e7e105ecfe4ece9c41841d8c9ebb30 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 11 Nov 2019 17:45:02 -0800 Subject: [PATCH 2/9] Avoid unnecessary copy of nested internal rows. --- .../datasources/parquet/ParquetRowConverter.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index ff5c724375c3..a69e5dfb3818 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -318,10 +318,18 @@ private[parquet] class ParquetRowConverter( new ParquetMapConverter(parquetType.asGroupType(), t, updater) case t: StructType => + val wrappedUpdater = { + if (updater.isInstanceOf[RowUpdater]) { + updater + } else { + new ParentContainerUpdater { + override def set(value: Any): Unit = + updater.set(value.asInstanceOf[InternalRow].copy()) + } + } + } new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, convertTz, new ParentContainerUpdater { - override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) - }) + schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater) case t => throw new RuntimeException( From 2ed8ea96d0ba4e03f83c813327fbd6b86e72dc4b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 23 Dec 2019 15:07:49 -0800 Subject: [PATCH 3/9] Add comments. --- .../datasources/parquet/ParquetRowConverter.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index a69e5dfb3818..e5dd2c5ae9a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -320,8 +320,21 @@ private[parquet] class ParquetRowConverter( case t: StructType => val wrappedUpdater = { if (updater.isInstanceOf[RowUpdater]) { + // `updater` is a RowUpdater, implying that the parent container is a struct. + // We do NOT need to perform defensive copying here because either: + // + // 1. The path from the schema root to this field consists only of nested + // structs, so this converter will only be invoked once per record and + // we don't need to copy because copying will be done in the final + // UnsafeProjection, or + // 2. The path from the schema root to this field contains a map or array, + // in which case we will perform a recursive defensive copy via the + // `else` branch below. updater } else { + // `updater` is NOT a RowUpdater, implying that the parent container is not a struct. + // Therefore, the parent container must be a map or array. We need to copy the row + // because this converter might be invoked multiple times per Parquet input record. new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) From 3fb3391edcee6c467b0189de2dcdaf4dac610083 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 23 Dec 2019 19:05:15 -0800 Subject: [PATCH 4/9] More specific cast; add braces. --- .../execution/datasources/parquet/ParquetRowConverter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index e5dd2c5ae9a3..60819f5be205 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -336,8 +336,9 @@ private[parquet] class ParquetRowConverter( // Therefore, the parent container must be a map or array. We need to copy the row // because this converter might be invoked multiple times per Parquet input record. new ParentContainerUpdater { - override def set(value: Any): Unit = - updater.set(value.asInstanceOf[InternalRow].copy()) + override def set(value: Any): Unit = { + updater.set(value.asInstanceOf[SpecificInternalRow].copy()) + } } } } From e67327abce8affc3ed3ab6f31edae16d0e82658d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 27 Dec 2019 15:49:19 -0800 Subject: [PATCH 5/9] Update tests to match existing style; strengthen map test --- .../datasources/parquet/ParquetIOSuite.scala | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 4c540eff88d7..954b4f53dfbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -84,13 +84,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple))) } - testStandardAndLegacyModes("array of struct") { - val data = (1 to 4).map(i => (i, Seq(Tuple1(i), Tuple1(i + 1)))) - val expected = (1 to 4).map(i => Row(i, Seq(Row(i), Row(i + 1)))) - withParquetDataFrame(data)(r => checkAnswer(r, expected)) - } - - test("basic data types (without binary)") { val data = (1 to 4).map { i => (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) @@ -211,6 +204,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + testStandardAndLegacyModes("array of struct") { + val data = (1 to 4).map { i => + Tuple1( + Seq( + Tuple1(s"1st_val_$i"), + Tuple1(s"2nd_val_$i") + ) + ) + } + withParquetDataFrame(data) { df => + // Structs are converted to `Row`s + checkAnswer(df, data.map { case Tuple1(array) => + Row(array.map(struct => Row(struct.productIterator.toSeq: _*))) + }) + } + } + testStandardAndLegacyModes("nested struct with array of array as field") { val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) withParquetDataFrame(data) { df => @@ -222,8 +232,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } testStandardAndLegacyModes("nested map with struct as value type") { - val data = (1 to 4).map(i => Tuple1(Map(i -> ((i, s"val_$i"))))) + val data = (1 to 4).map { i => + Tuple1( + Map( + s"kA_$i" -> ((i, s"vA_$i")), + s"kB_$i" -> ((i, s"vB_$i")) + ) + ) + } withParquetDataFrame(data) { df => + // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(m) => Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) }) From fffe72b435c8f9f2d4cd0a0742fd7bfd247eabf7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 27 Dec 2019 15:52:55 -0800 Subject: [PATCH 6/9] Add test for struct as map key type --- .../datasources/parquet/ParquetIOSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 954b4f53dfbd..78b036b82b25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -231,6 +231,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + testStandardAndLegacyModes("nested map with struct as key type") { + val data = (1 to 4).map { i => + Tuple1( + Map( + (i, s"kA_$i") -> s"vA_$i", + (i, s"kB_$i") -> s"vB_$i" + ) + ) + } + withParquetDataFrame(data) { df => + // Structs are converted to `Row`s + checkAnswer(df, data.map { case Tuple1(m) => + Row(m.map { case (k, v) => Row(k.productIterator.toSeq: _*) -> v }) + }) + } + } + testStandardAndLegacyModes("nested map with struct as value type") { val data = (1 to 4).map { i => Tuple1( From e6945e88a24d51551cba105b5e7e3825bc5e0a69 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 30 Dec 2019 20:17:19 -0800 Subject: [PATCH 7/9] Add JIRA number to comment --- .../sql/execution/datasources/parquet/ParquetRowConverter.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 60819f5be205..9d65c8b9eef6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -319,6 +319,7 @@ private[parquet] class ParquetRowConverter( case t: StructType => val wrappedUpdater = { + // SPARK-30338: avoid unnecessary InternalRow copying for nested structs: if (updater.isInstanceOf[RowUpdater]) { // `updater` is a RowUpdater, implying that the parent container is a struct. // We do NOT need to perform defensive copying here because either: From 4651b2fd724a56515c087903284682c9ba947c31 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 6 Jan 2020 12:57:46 -0800 Subject: [PATCH 8/9] Re-word code comment based on review feedback. --- .../parquet/ParquetRowConverter.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 9d65c8b9eef6..d47f551b905e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -320,25 +320,26 @@ private[parquet] class ParquetRowConverter( case t: StructType => val wrappedUpdater = { // SPARK-30338: avoid unnecessary InternalRow copying for nested structs: + // There are two cases to handle here: + // + // 1. Parent container is a map or array: we must make a deep copy of the mutable row + // because this converter may be invoked multiple times per Parquet input record + // (if the map or array contains multiple elements). + // + // 2. Parent container is a struct: we don't need to copy the row here because either: + // + // (a) all ancestors are structs and therefore no copying is required because this + // converter will only be invoked once per Parquet input record, or + // (b) some ancestor is struct that is nested in a map or array and that ancestor's + // converter will perform deep-copying (which will recursively copy this row). if (updater.isInstanceOf[RowUpdater]) { // `updater` is a RowUpdater, implying that the parent container is a struct. - // We do NOT need to perform defensive copying here because either: - // - // 1. The path from the schema root to this field consists only of nested - // structs, so this converter will only be invoked once per record and - // we don't need to copy because copying will be done in the final - // UnsafeProjection, or - // 2. The path from the schema root to this field contains a map or array, - // in which case we will perform a recursive defensive copy via the - // `else` branch below. updater } else { - // `updater` is NOT a RowUpdater, implying that the parent container is not a struct. - // Therefore, the parent container must be a map or array. We need to copy the row - // because this converter might be invoked multiple times per Parquet input record. + // `updater` is NOT a RowUpdater, implying that the parent container a map or array. new ParentContainerUpdater { override def set(value: Any): Unit = { - updater.set(value.asInstanceOf[SpecificInternalRow].copy()) + updater.set(value.asInstanceOf[SpecificInternalRow].copy()) // deep copy } } } From 0f1af94c6cfc72651c3209bc0d31ae1bef140988 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 6 Jan 2020 14:21:23 -0800 Subject: [PATCH 9/9] Add test case for array of nested struct --- .../datasources/parquet/ParquetIOSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 78b036b82b25..1550b3bbb624 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -221,6 +221,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + testStandardAndLegacyModes("array of nested struct") { + val data = (1 to 4).map { i => + Tuple1( + Seq( + Tuple1( + Tuple1(s"1st_val_$i")), + Tuple1( + Tuple1(s"2nd_val_$i")) + ) + ) + } + withParquetDataFrame(data) { df => + // Structs are converted to `Row`s + checkAnswer(df, data.map { case Tuple1(array) => + Row(array.map { case Tuple1(Tuple1(str)) => Row(Row(str))}) + }) + } + } + testStandardAndLegacyModes("nested struct with array of array as field") { val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) withParquetDataFrame(data) { df =>