Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,15 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you need to pass caseSensitive across ParquetRecordMaterializer, ParquetRowConverter. Can't we just get it at ParquetRowConverter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I get runtime config at ParquetRowConverter? I'm not concretely understand it's behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLConf.get works, even in executor sid, see dd37529

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll update to using SQLConf instead of passing argument across classes.

val parquetRequestedSchema = readContext.getRequestedSchema
new ParquetRecordMaterializer(
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetToSparkSchemaConverter(conf),
convertTz)
convertTz,
caseSensitive)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@ private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
convertTz: Option[ZoneId])
convertTz: Option[ZoneId],
caseSensitive: Boolean)
extends RecordMaterializer[InternalRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)
new ParquetRowConverter(
schemaConverter,
parquetSchema,
catalystSchema,
convertTz,
caseSensitive,
NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -126,6 +126,7 @@ private[parquet] class ParquetRowConverter(
parquetType: GroupType,
catalystType: StructType,
convertTz: Option[ZoneId],
caseSensitive: Boolean,
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {

Expand Down Expand Up @@ -176,10 +177,22 @@ private[parquet] class ParquetRowConverter(
*/
def currentRecord: InternalRow = currentRow


// Converters for each field.
private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
// For letter case issue, create name to field index based on case sensitivity
val catalystFieldNameToIndex = if (caseSensitive) {
catalystType.fieldNames.zipWithIndex.toMap
} else {
CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
}

parquetType.getFields.asScala.map { parquetField =>
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
val fieldIndex = catalystFieldNameToIndex.getOrElse(parquetField.getName,
throw new IllegalArgumentException(
s"${parquetField.getName} does not exist. " +
s"Available: ${catalystType.fieldNames.mkString(", ")}")
)
val catalystField = catalystType(fieldIndex)
// Converted field value should be set to the `fieldIndex`-th cell of `currentRow`
newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex))
Expand Down Expand Up @@ -348,7 +361,7 @@ private[parquet] class ParquetRowConverter(
}
}
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater)
schemaConverter, parquetType.asGroupType(), t, convertTz, caseSensitive, wrappedUpdater)

case t =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,162 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

test("SPARK-31116: Select simple parquet columns correctly in case insensitive manner") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move new test cases into FileBasedDataSourceSuite and run with Orc/Parquet/Json at least?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll test soon. however, could these new test cases apply to Orc and Json also?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Please start with all of three and comment out if ORC/Json fails.

Copy link
Contributor Author

@kimtkyeom kimtkyeom Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested ORC and Json file format and there exist some failures.

Json test failure

Json passed case sensitive cases, but it failed in case insensitive case

[info] - SPARK-31116: Select simple columns correctly in case insensitive manner *** FAILED *** (4 seconds, 277 milliseconds)
[info]   Results do not match for query:
[info]   Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
[info]   Timezone Env:
[info]
[info]   == Parsed Logical Plan ==
[info]   Relation[camelcase#56] json
[info]
[info]   == Analyzed Logical Plan ==
[info]   camelcase: string
[info]   Relation[camelcase#56] json
[info]
[info]   == Optimized Logical Plan ==
[info]   Relation[camelcase#56] json
[info]
[info]   == Physical Plan ==
[info]   FileScan json [camelcase#56] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/Users/kimtkyeom/Dev/spark_devel/target/tmp/spark-95f1357a-85c9-444f-bdcc-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<camelcase:string>
[info]
[info]   == Results ==
[info]
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<camelcase:string>
[info]   ![A]                        [null] (QueryTest.scala:248)
[info] - SPARK-31116: Select nested columns correctly in case insensitive manner *** FAILED *** (2 seconds, 117 milliseconds)
[info]   Results do not match for query:
[info]   Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
[info]   Timezone Env:
[info]
[info]   == Parsed Logical Plan ==
[info]   Relation[StructColumn#147] json
[info]
[info]   == Analyzed Logical Plan ==
[info]   StructColumn: struct<LowerCase:bigint,camelcase:bigint>
[info]   Relation[StructColumn#147] json
[info]
[info]   == Optimized Logical Plan ==
[info]   Relation[StructColumn#147] json
[info]
[info]   == Physical Plan ==
[info]   FileScan json [StructColumn#147] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/Users/kimtkyeom/Dev/spark_devel/target/tmp/spark-f9ecd1a4-e5aa-4dd7-bdfd-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<StructColumn:struct<LowerCase:bigint,camelcase:bigint>>
[info]
[info]   == Results ==
[info]
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<StructColumn:struct<LowerCase:bigint,camelcase:bigint>>
[info]   ![[0,1]]                    [[null,null]] (QueryTest.scala:248)

ORC test failure

ORC passed case insensitive test cases, but it failed case sensitive manner.

[info] - SPARK-31116: Select nested columns correctly in case sensitive manner *** FAILED *** (871 milliseconds)
[info]   Results do not match for query:
[info]   Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
[info]   Timezone Env:
[info]
[info]   == Parsed Logical Plan ==
[info]   Relation[StructColumn#329] json
[info]
[info]   == Analyzed Logical Plan ==
[info]   StructColumn: struct<LowerCase:bigint,camelcase:bigint>
[info]   Relation[StructColumn#329] json
[info]
[info]   == Optimized Logical Plan ==
[info]   Relation[StructColumn#329] json
[info]
[info]   == Physical Plan ==
[info]   FileScan json [StructColumn#329] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/Users/kimtkyeom/Dev/spark_devel/target/tmp/spark-612baf76-a9d0-41e5-89f4-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<StructColumn:struct<LowerCase:bigint,camelcase:bigint>>
[info]
[info]   == Results ==
[info]
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<StructColumn:struct<LowerCase:bigint,camelcase:bigint>>
[info]   ![null]                     [[null,null]] (QueryTest.scala:248)

But i think ORC failure is due to difference between materializing Row. Is there clean way to test properly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, I noticed that json does not follow case sensitivity even in spark 2.4.4. Below is my local machine test using spark-shell

20/03/12 19:20:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/12 19:20:24 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://61.75.36.130:4041
Spark context available as 'sc' (master = local[*], app id = local-1584008425035).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = Seq("A").toDF("camelCase")
df: org.apache.spark.sql.DataFrame = [camelCase: string]

scala> df.write.format("json").save("./json_simple")

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val sch2 = new StructType().add("camelcase", StringType)
sch2: org.apache.spark.sql.types.StructType = StructType(StructField(camelcase,StringType,true))

scala> spark.read.format("json").schema(sch2).load("./json_simple").show()
+---------+
|camelcase|
+---------+
|     null.     |
+---------+

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for checking. Could you file a JIRA for regressions only?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update your PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I updated my PR and create jira issue. As other file formats (orc and json) also fail these test cases I omit to check these formats by now, just moved current test cases into FileBasedDataSourceSuite. I think it would be added when regression will be fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the ORC case only when spark.sql.optimizer.nestedSchemaPruning.enabled is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked test, but it produce same result above regardless of nestedSchemaPruning option

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
withTempPath { dir => {
val path = dir.getCanonicalPath

// Prepare values for testing specific parquet record reader
Seq("A").toDF("camelCase").write.parquet(path)

val exactSchema = new StructType().add("camelCase", StringType)
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row("A"))

// In case insensitive manner, parquet's column cases are ignored
val caseInsensitiveSchema = new StructType().add("camelcase", StringType)
checkAnswer(spark.read.schema(caseInsensitiveSchema).parquet(path), Row("A"))
}}
}
}

test("SPARK-31116: Select nested parquet columns correctly in case insensitive manner") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
withTempPath { dir => {
val path = dir.getCanonicalPath

// Prepare values for testing nested parquet data
spark
.range(1L)
.selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
.write.parquet(path)

val exactSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType))
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row(Row(0, 1)))

// In case insensitive manner, parquet's column cases are ignored
val innerColumnCaseInsensitiveSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType))
checkAnswer(
spark.read.schema(innerColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, 1)))

val rootColumnCaseInsensitiveSchema = new StructType()
.add(
"structColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType))
checkAnswer(
spark.read.schema(rootColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, 1)))
}}
}
}

test("SPARK-31116: Select simple parquet columns correctly in case sensitive manner") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
withTempPath { dir => {
val path = dir.getCanonicalPath

// Prepare values for testing specific parquet record reader
Seq("A").toDF("camelCase").write.parquet(path)

val exactSchema = new StructType().add("camelCase", StringType)
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row("A"))

// In case sensitive manner, different letter case does not read column
val caseInsensitiveSchema = new StructType().add("camelcase", StringType)
checkAnswer(spark.read.schema(caseInsensitiveSchema).parquet(path), Row(null))

// It also properly work in combined schema
val combinedSchema = new StructType()
.add("camelCase", StringType)
.add("camelcase", StringType)
checkAnswer(spark.read.schema(combinedSchema).parquet(path), Row("A", null))
}}
}
}

test("SPARK-31116: Select nested parquet columns correctly in case sensitive manner") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
withTempPath { dir => {
val path = dir.getCanonicalPath

// Prepare values for testing nested parquet data
spark
.range(1)
.selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn")
.write.parquet(path)

val exactSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType))
checkAnswer(spark.read.schema(exactSchema).parquet(path), Row(Row(0, 1)))

val innerColumnCaseInsensitiveSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType))
checkAnswer(
spark.read.schema(innerColumnCaseInsensitiveSchema).parquet(path),
Row(null))

val innerPartialColumnCaseInsensitiveSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("lowercase", LongType)
.add("camelcase", LongType))
checkAnswer(
spark.read.schema(innerPartialColumnCaseInsensitiveSchema).parquet(path),
Row(Row(0, null)))

val rootColumnCaseInsensitiveSchema = new StructType()
.add(
"structColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType))
checkAnswer(
spark.read.schema(rootColumnCaseInsensitiveSchema).parquet(path),
Row(null))

val combinedSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType)
.add("LowerCase", LongType)
.add("camelcase", LongType))
.add(
"structColumn",
new StructType()
.add("lowercase", LongType)
.add("camelCase", LongType)
.add("LowerCase", LongType)
.add("camelcase", LongType))
checkAnswer(
spark.read.schema(combinedSchema).parquet(path),
Row(Row(0, 1, null, null), null))
}}
}
}

test("Migration from INT96 to TIMESTAMP_MICROS timestamp type") {
def testMigration(fromTsType: String, toTsType: String): Unit = {
def checkAppend(write: DataFrameWriter[_] => Unit, readback: => DataFrame): Unit = {
Expand Down