diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 0a7631f78219..d14a53cc3ea7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} +import org.apache.spark.sql.hive.{HiveInspectors, HiveSessionState, HiveShim} import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -126,6 +126,9 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + val convertHiveOrc = + sparkSession.sessionState.asInstanceOf[HiveSessionState].convertMetastoreOrc + (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value @@ -137,7 +140,31 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + + // SPARK-16628: an Orc table created by Hive may not store column name correctly in the + // Orc files. So the physical schema can mismatch required schema which comes from + // metastore schema and reading Orc files will fail. + // To fix this, we assume the metastore schema `dataSchema` can match to `physicalSchema` + // by each column disregarding the column names. If not, we throw an exception that + // suggests users to disable the conversion of Hive Orc tables. + val physicalRequiredSchema = + if (convertHiveOrc && OrcRelation.isMismatchSchema(physicalSchema, requiredSchema)) { + require(physicalSchema.length == dataSchema.length, + s"physical schema $physicalSchema in Orc file doesn't match metastore schema " + + s"$dataSchema, please disable spark.sql.hive.convertMetastoreOrc to work around " + + "this problem.") + physicalSchema.fields.zip(dataSchema.fields).map { case (pf, df) => + require(pf.dataType == df.dataType, + s"Column $pf in Orc file doesn't match column $df in metastore schema, " + + "please disable spark.sql.hive.convertMetastoreOrc to work around this problem.") + } + OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema) + StructType( + requiredSchema.map(a => dataSchema.fieldIndex(a.name)).map(physicalSchema(_))) + } else { + OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + requiredSchema + } val orcRecordReader = { val job = Job.getInstance(conf) @@ -161,7 +188,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // Unwraps `OrcStruct`s to `UnsafeRow`s OrcRelation.unwrapOrcStructs( conf, - requiredSchema, + physicalRequiredSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), recordsIterator) } @@ -303,6 +330,10 @@ private[orc] object OrcRelation extends HiveInspectors { maybeStructOI.map(unwrap).getOrElse(Iterator.empty) } + def isMismatchSchema(physicalSchema: StructType, requestedSchema: StructType): Boolean = { + requestedSchema.forall(a => physicalSchema.getFieldIndex(a.name).isEmpty) + } + def setRequiredColumns( conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index b8761e9de288..ab7dbf8b801a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader} import org.scalatest.BeforeAndAfterAll +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator} @@ -590,6 +591,79 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("ORC conversion when metastore schema does not match schema stored in ORC files") { + withTempView("single") { + val singleRowDF = Seq((0, "foo")).toDF("key", "value") + singleRowDF.createOrReplaceTempView("single") + + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") { + withTable("dummy_orc", "dummy_orc2", "dummy_orc3") { + withTempPath { dir => + val path = dir.getCanonicalPath + + // Create a Metastore ORC table and insert data into it. + spark.sql( + s""" + |CREATE TABLE dummy_orc(value STRING) + |PARTITIONED BY (key INT) + |STORED AS ORC + |LOCATION '$path' + """.stripMargin) + + spark.sql( + s""" + |INSERT INTO TABLE dummy_orc + |PARTITION(key=0) + |SELECT value FROM single + """.stripMargin) + + val df = spark.sql("SELECT key, value FROM dummy_orc WHERE key=0") + checkAnswer(df, singleRowDF) + + // Create a Metastore ORC table with the schema of different column names. + spark.sql( + s""" + |CREATE EXTERNAL TABLE dummy_orc2(value2 STRING) + |PARTITIONED BY (key INT) + |STORED AS ORC + |LOCATION '$path' + """.stripMargin) + + spark.sql("ALTER TABLE dummy_orc2 ADD PARTITION(key=0)") + + // The output of the relation is the schema from the Metastore, not from the orc file. + val df2 = spark.sql("SELECT key, value2 FROM dummy_orc2 WHERE key=0 AND value2='foo'") + checkAnswer(df2, singleRowDF) + + val queryExecution = df2.queryExecution + queryExecution.analyzed.collectFirst { + case _: LogicalRelation => () + }.getOrElse { + fail(s"Expecting the query plan to convert orc to data sources, " + + s"but got:\n$queryExecution") + } + + // When the column types of Orc files are not matching with metastore schema, + // we can't convert Hive metastore Orc table to datasource table. + spark.sql( + s""" + |CREATE EXTERNAL TABLE dummy_orc3(value2 INT) + |PARTITIONED BY (key INT) + |STORED AS ORC + |LOCATION '$path' + """.stripMargin) + + spark.sql("ALTER TABLE dummy_orc3 ADD PARTITION(key=0)") + val errorMessage = intercept[SparkException] { + spark.sql("SELECT key, value2 FROM dummy_orc3 WHERE key=0 AND value2=1").count() + }.getMessage + assert(errorMessage.contains("please disable spark.sql.hive.convertMetastoreOrc")) + } + } + } + } + } + test("Empty schema does not read data from ORC file") { val data = Seq((1, 1), (2, 2)) withOrcFile(data) { path =>