Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.spark.sql.hive.{HiveInspectors, HiveSessionState, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -126,6 +126,9 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

val convertHiveOrc =
sparkSession.sessionState.asInstanceOf[HiveSessionState].convertMetastoreOrc

(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value

Expand All @@ -137,7 +140,31 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
Iterator.empty
} else {
val physicalSchema = maybePhysicalSchema.get
OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)

// SPARK-16628: an Orc table created by Hive may not store column name correctly in the
// Orc files. So the physical schema can mismatch required schema which comes from
// metastore schema and reading Orc files will fail.
// To fix this, we assume the metastore schema `dataSchema` can match to `physicalSchema`
// by each column disregarding the column names. If not, we throw an exception that
// suggests users to disable the conversion of Hive Orc tables.
val physicalRequiredSchema =
if (convertHiveOrc && OrcRelation.isMismatchSchema(physicalSchema, requiredSchema)) {
require(physicalSchema.length == dataSchema.length,
s"physical schema $physicalSchema in Orc file doesn't match metastore schema " +
s"$dataSchema, please disable spark.sql.hive.convertMetastoreOrc to work around " +
"this problem.")
physicalSchema.fields.zip(dataSchema.fields).map { case (pf, df) =>
require(pf.dataType == df.dataType,
s"Column $pf in Orc file doesn't match column $df in metastore schema, " +
"please disable spark.sql.hive.convertMetastoreOrc to work around this problem.")
}
OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema)
StructType(
requiredSchema.map(a => dataSchema.fieldIndex(a.name)).map(physicalSchema(_)))
} else {
OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema)
requiredSchema
}

val orcRecordReader = {
val job = Job.getInstance(conf)
Expand All @@ -161,7 +188,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
// Unwraps `OrcStruct`s to `UnsafeRow`s
OrcRelation.unwrapOrcStructs(
conf,
requiredSchema,
physicalRequiredSchema,
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
recordsIterator)
}
Expand Down Expand Up @@ -303,6 +330,10 @@ private[orc] object OrcRelation extends HiveInspectors {
maybeStructOI.map(unwrap).getOrElse(Iterator.empty)
}

def isMismatchSchema(physicalSchema: StructType, requestedSchema: StructType): Boolean = {
requestedSchema.forall(a => physicalSchema.getFieldIndex(a.name).isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, the following?

!requestedSchema.forall(a => physicalSchema.getFieldIndex(a.name).isDefined)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, never mind. It's really about checking whether all requested columns are not matched.

}

def setRequiredColumns(
conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = {
val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,63 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}

test("ORC conversion when metastore schema does not match schema stored in ORC files") {
withTempView("single") {
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
singleRowDF.createOrReplaceTempView("single")

withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") {
withTable("dummy_orc") {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Create a Metastore ORC table and insert data into it.
spark.sql(
s"""
|CREATE TABLE dummy_orc(value STRING)
|PARTITIONED BY (key INT)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

spark.sql(
s"""
|INSERT INTO TABLE dummy_orc
|PARTITION(key=0)
|SELECT value FROM single
""".stripMargin)

val df = spark.sql("SELECT key, value FROM dummy_orc WHERE key=0")
checkAnswer(df, singleRowDF)

// Create a Metastore ORC table with different schema.
spark.sql(
s"""
|CREATE EXTERNAL TABLE dummy_orc2(value2 STRING)
|PARTITIONED BY (key INT)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

spark.sql("ALTER TABLE dummy_orc2 ADD PARTITION(key=0)")

// The output of the relation is the schema from the Metastore, not the file.
val df2 = spark.sql("SELECT key, value2 FROM dummy_orc2 WHERE key=0 AND value2='foo'")
checkAnswer(df2, singleRowDF)

val queryExecution = df2.queryExecution
queryExecution.analyzed.collectFirst {
case _: LogicalRelation => ()
}.getOrElse {
fail(s"Expecting the query plan to convert orc to data sources, " +
s"but got:\n$queryExecution")
}
}
}
}
}
}

test("Empty schema does not read data from ORC file") {
val data = Seq((1, 1), (2, 2))
withOrcFile(data) { path =>
Expand Down