Skip to content

Commit e736c62

Browse files
kimtkyeomdongjoon-hyun
authored andcommitted
[SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter
### What changes were proposed in this pull request? This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so that it handle materialize parquet properly with respect to case sensitivity ### Why are the changes needed? From spark 3.0.0, below statement throws IllegalArgumentException in caseInsensitive mode because of explicit field index searching in ParquetRowConverter. As we already constructed parquet requested schema and catalyst requested schema during schema clipping in ParquetReadSupport, just follow these behavior. ```scala val path = "/some/temp/path" spark .range(1L) .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") .write.parquet(path) val caseInsensitiveSchema = new StructType() .add( "StructColumn", new StructType() .add("LowerCase", LongType) .add("camelcase", LongType)) spark.read.schema(caseInsensitiveSchema).parquet(path).show() ``` ### Does this PR introduce any user-facing change? No. The changes are only in unreleased branches (`master` and `branch-3.0`). ### How was this patch tested? Passed new test cases that check parquet column selection with respect to schemas and case sensitivities Closes #27888 from kimtkyeom/parquet_row_converter_case_sensitivity. Authored-by: Tae-kyeom, Kim <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 21c02ee commit e736c62

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.sql.catalyst.InternalRow
3535
import org.apache.spark.sql.catalyst.expressions._
36-
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
36+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
3737
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
38+
import org.apache.spark.sql.internal.SQLConf
3839
import org.apache.spark.sql.types._
3940
import org.apache.spark.unsafe.types.UTF8String
4041

@@ -178,8 +179,15 @@ private[parquet] class ParquetRowConverter(
178179

179180
// Converters for each field.
180181
private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
182+
// (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
183+
// to prevent throwing IllegalArgumentException when searching catalyst type's field index
184+
val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
185+
catalystType.fieldNames.zipWithIndex.toMap
186+
} else {
187+
CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
188+
}
181189
parquetType.getFields.asScala.map { parquetField =>
182-
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
190+
val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
183191
val catalystField = catalystType(fieldIndex)
184192
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
185193
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,46 @@ class FileBasedDataSourceSuite extends QueryTest
842842
}
843843
}
844844
}
845+
846+
test("SPARK-31116: Select nested schema with case insensitive mode") {
847+
// This test case failed at only Parquet. ORC is added for test coverage parity.
848+
Seq("orc", "parquet").foreach { format =>
849+
Seq("true", "false").foreach { nestedSchemaPruningEnabled =>
850+
withSQLConf(
851+
SQLConf.CASE_SENSITIVE.key -> "false",
852+
SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedSchemaPruningEnabled) {
853+
withTempPath { dir =>
854+
val path = dir.getCanonicalPath
855+
856+
// Prepare values for testing nested parquet data
857+
spark
858+
.range(1L)
859+
.selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
860+
.write
861+
.format(format)
862+
.save(path)
863+
864+
val exactSchema = "StructColumn struct<lowercase: LONG, camelCase: LONG>"
865+
866+
checkAnswer(spark.read.schema(exactSchema).format(format).load(path), Row(Row(0, 1)))
867+
868+
// In case insensitive manner, parquet's column cases are ignored
869+
val innerColumnCaseInsensitiveSchema =
870+
"StructColumn struct<Lowercase: LONG, camelcase: LONG>"
871+
checkAnswer(
872+
spark.read.schema(innerColumnCaseInsensitiveSchema).format(format).load(path),
873+
Row(Row(0, 1)))
874+
875+
val rootColumnCaseInsensitiveSchema =
876+
"structColumn struct<lowercase: LONG, camelCase: LONG>"
877+
checkAnswer(
878+
spark.read.schema(rootColumnCaseInsensitiveSchema).format(format).load(path),
879+
Row(Row(0, 1)))
880+
}
881+
}
882+
}
883+
}
884+
}
845885
}
846886

847887
object TestingUDT {

0 commit comments

Comments
 (0)