Skip to content

Commit 44a9e72

Browse files
committed
Do not pass caseSensitive arument into ParquetRowConverter use
SQLConf.get instead. * Also non-existing column in parquet requested column is filled null, so do not calling getOrElse directly applying instead.
1 parent ca07f74 commit 44a9e72

3 files changed

Lines changed: 7 additions & 15 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,12 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
122122
keyValueMetaData: JMap[String, String],
123123
fileSchema: MessageType,
124124
readContext: ReadContext): RecordMaterializer[InternalRow] = {
125-
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
126-
SQLConf.CASE_SENSITIVE.defaultValue.get)
127125
val parquetRequestedSchema = readContext.getRequestedSchema
128126
new ParquetRecordMaterializer(
129127
parquetRequestedSchema,
130128
ParquetReadSupport.expandUDT(catalystRequestedSchema),
131129
new ParquetToSparkSchemaConverter(conf),
132-
convertTz,
133-
caseSensitive)
130+
convertTz)
134131
}
135132
}
136133

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ private[parquet] class ParquetRecordMaterializer(
3636
parquetSchema: MessageType,
3737
catalystSchema: StructType,
3838
schemaConverter: ParquetToSparkSchemaConverter,
39-
convertTz: Option[ZoneId],
40-
caseSensitive: Boolean)
39+
convertTz: Option[ZoneId])
4140
extends RecordMaterializer[InternalRow] {
4241

4342
private val rootConverter =
@@ -46,7 +45,6 @@ private[parquet] class ParquetRecordMaterializer(
4645
parquetSchema,
4746
catalystSchema,
4847
convertTz,
49-
caseSensitive,
5048
NoopUpdater)
5149

5250
override def getCurrentRecord: InternalRow = rootConverter.currentRecord

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.InternalRow
3535
import org.apache.spark.sql.catalyst.expressions._
3636
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
3737
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
38+
import org.apache.spark.sql.internal.SQLConf
3839
import org.apache.spark.sql.types._
3940
import org.apache.spark.unsafe.types.UTF8String
4041

@@ -126,7 +127,6 @@ private[parquet] class ParquetRowConverter(
126127
parquetType: GroupType,
127128
catalystType: StructType,
128129
convertTz: Option[ZoneId],
129-
caseSensitive: Boolean,
130130
updater: ParentContainerUpdater)
131131
extends ParquetGroupConverter(updater) with Logging {
132132

@@ -179,17 +179,15 @@ private[parquet] class ParquetRowConverter(
179179

180180
// Converters for each field.
181181
private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
182-
val catalystFieldNameToIndex = if (caseSensitive) {
182+
// (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
183+
// to prevent throwing IllegalArgumentException when searching catalyst type's field index
184+
val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
183185
catalystType.fieldNames.zipWithIndex.toMap
184186
} else {
185187
CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
186188
}
187189
parquetType.getFields.asScala.map { parquetField =>
188-
val fieldIndex = catalystFieldNameToIndex.getOrElse(parquetField.getName,
189-
throw new RuntimeException(
190-
s"${parquetField.getName} does not exist. " +
191-
s"Available: ${catalystType.fieldNames.mkString(", ")}")
192-
)
190+
val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
193191
val catalystField = catalystType(fieldIndex)
194192
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
195193
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
@@ -362,7 +360,6 @@ private[parquet] class ParquetRowConverter(
362360
parquetType.asGroupType(),
363361
t,
364362
convertTz,
365-
caseSensitive,
366363
wrappedUpdater)
367364

368365
case t =>

0 commit comments

Comments
 (0)