Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.types.{StructField, StructType}

/**
* An interface for mapping two different schemas. For the relations that have are backed by files,
* the inferred schema from the files might be different with the schema stored in the catalog. In
* such case, the interface helps mapping inconsistent schemas.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put the detailed description of this mapping in the doc here? thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more detailed document. Please take a look. Thanks.

*/
private[sql] trait SchemaMapping {
/** The schema inferred from the files. */
val dataSchema: StructType

/** The schema used in partition. */
val partitionSchema: StructType

/** The schema fetched from the catalog. */
val catalogSchema: StructType

require(catalogSchema.length == 0 ||
dataSchema.merge(partitionSchema).length == catalogSchema.length,
s"The data schema in files: $dataSchema plus the partition schema: $partitionSchema " +
s"should have the same number of fields with the schema in catalog: $catalogSchema.")

/** Returns the correspond catalog field for the given data field. */
def lookForFieldFromDataField(field: StructField): Option[StructField] = {
if (catalogSchema.fields.length == 0) {
None
} else {
dataSchema.getFieldIndex(field.name).map { idx =>
catalogSchema.fields(idx)
}
}
}

/** Returns the correspond data field for the given catalog field. */
def lookForFieldFromCatalogField(field: StructField): Option[StructField] = {
catalogSchema.getFieldIndex(field.name).map { idx =>
dataSchema.fields(idx)
}
}

/** Returns the correspond data field for the given catalog field. */
def lookForFieldFromCatalogField(fieldName: String): Option[StructField] = {
catalogSchema.getFieldIndex(fieldName).map { idx =>
dataSchema.fields(idx)
}
}

/**
* Transforms the attributes in the given expression which is based on the catalog schema
* to corresponding attributes in the schema in the files.
*/
def transformExpressionToUseDataSchema(expr: Expression): Expression = {
expr transform {
case a: AttributeReference =>
lookForFieldFromCatalogField(a.name).map { field =>
a.withName(field.name)
}.getOrElse(a)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.StructType

/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
Expand Down Expand Up @@ -85,11 +86,22 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")

val dataColumns =
// Transform data schema to the schema in catalog if any.
val relationSchema = StructType(fsRelation.dataSchema.flatMap { field =>
fsRelation.lookForFieldFromDataField(field)
})

val dataColumns = if (relationSchema.length != fsRelation.dataSchema.length) {
l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
} else {
l.resolve(relationSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
}

// Partition keys are not available in the statistics of the files.
// Data filters are based on the schema stored in files which might be different with the
// relation's output schema. We need to transform the filters.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
.map (filter => fsRelation.transformExpressionToUseDataSchema(filter))

// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters
Expand All @@ -105,7 +117,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val prunedDataSchema = readDataColumns.toStructType
val prunedDataSchema = StructType(readDataColumns.toStructType.map { field =>
fsRelation.lookForFieldFromCatalogField(field).getOrElse(field)
})
logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")

val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.{FileRelation, SchemaMapping}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

/**
Expand Down Expand Up @@ -132,6 +132,7 @@ abstract class OutputWriter {
* @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
* @param fileFormat A file format that can be used to read and write the data in files.
* @param options Configuration used when reading / writing data.
* @param catalogSchema The schema fetched from the catalog such as Metastore if any.
*/
case class HadoopFsRelation(
sparkSession: SparkSession,
Expand All @@ -140,14 +141,24 @@ case class HadoopFsRelation(
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String]) extends BaseRelation with FileRelation {
options: Map[String, String],
catalogSchema: StructType = new StructType())
extends BaseRelation with FileRelation with SchemaMapping {

override def sqlContext: SQLContext = sparkSession.sqlContext

val schema: StructType = {
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
StructType(dataSchema ++ partitionSchema.filterNot { column =>
dataSchemaColumnNames.contains(column.name.toLowerCase)
// If there is given catalog schema, we should use it as relation output instead of the schema
// inferred from the files.
val schemaColumns = if (catalogSchema.fields.length == 0) {
dataSchema
} else {
catalogSchema
}

val schemaColumnNames = schemaColumns.map(_.name.toLowerCase).toSet
StructType(schemaColumns ++ partitionSchema.filterNot { column =>
schemaColumnNames.contains(column.name.toLowerCase)
})
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.types._

case class SchemaMappingRelation(
val dataSchema: StructType,
val partitionSchema: StructType,
val catalogSchema: StructType) extends SchemaMapping

class SchemaMappingSuite extends SparkFunSuite {

val dataSchema = StructType(
StructField("_col1", IntegerType) ::
StructField("_col2", LongType) ::
StructField("_col3", BooleanType) :: Nil)

val partitionSchema = StructType(
StructField("part", IntegerType) :: Nil)

val catalogSchema = StructType(
StructField("f1", IntegerType) ::
StructField("f2", LongType) ::
StructField("f3", BooleanType) ::
StructField("part", IntegerType) :: Nil)

val relation = SchemaMappingRelation(dataSchema, partitionSchema, catalogSchema)

test("looking for data schema field with given catalog field name") {
val col1 = relation.lookForFieldFromCatalogField("f1").get
assert(col1.name == "_col1" && col1.dataType == IntegerType)

val col2 = relation.lookForFieldFromCatalogField("f2").get
assert(col2.name == "_col2" && col2.dataType == LongType)

val col3 = relation.lookForFieldFromCatalogField("f3").get
assert(col3.name == "_col3" && col3.dataType == BooleanType)

assert(relation.lookForFieldFromCatalogField("f4").isEmpty)
}

test("relation with empty catalog schema") {
val relationWithoutCatalogSchema = SchemaMappingRelation(dataSchema,
partitionSchema, new StructType())
assert(relationWithoutCatalogSchema.lookForFieldFromCatalogField("f1").isEmpty)
}

test("data schema must match catalog schema in length if catalog schema is not empty") {
val catalogSchema = StructType(StructField("f1", IntegerType) :: Nil)
val e = intercept[RuntimeException] {
SchemaMappingRelation(dataSchema, partitionSchema, catalogSchema)
}
assert(e.getMessage.contains("should have the same number of fields"))
}

test("transform expression of catalog schema fields to use data schema fields") {
val attr = AttributeReference("f1", IntegerType)()
val expr = EqualTo(attr, Literal(1))
val expected = EqualTo(attr.withName("_col1"), Literal(1))
assert(relation.transformExpressionToUseDataSchema(expr) == expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
fileFormatClass: Class[_ <: FileFormat],
fileType: String): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)

val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
Expand Down Expand Up @@ -308,7 +309,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
options = options)
options = options,
catalogSchema = metastoreSchema)

val created = LogicalRelation(
relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,4 +497,61 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
}
}

test("ORC conversion when metastore schema does not match schema stored in ORC files") {
withTempView("single") {
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
singleRowDF.createOrReplaceTempView("single")

withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") {
withTable("dummy_orc") {
withTempPath { dir =>
val path = dir.getCanonicalPath

// Create a Metastore ORC table and insert data into it.
spark.sql(
s"""
|CREATE TABLE dummy_orc(value STRING)
|PARTITIONED BY (key INT)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

spark.sql(
s"""
|INSERT INTO TABLE dummy_orc
|PARTITION(key=0)
|SELECT value FROM single
""".stripMargin)

val df = spark.sql("SELECT key, value FROM dummy_orc WHERE key=0")
checkAnswer(df, singleRowDF)

// Create a Metastore ORC table with different schema.
spark.sql(
s"""
|CREATE EXTERNAL TABLE dummy_orc2(value2 STRING)
|PARTITIONED BY (key INT)
|STORED AS ORC
|LOCATION '$path'
""".stripMargin)

spark.sql("ALTER TABLE dummy_orc2 ADD PARTITION(key=0)")

// The output of the relation is the schema from the Metastore, not the file.
val df2 = spark.sql("SELECT key, value2 FROM dummy_orc2 WHERE key=0 AND value2='foo'")
checkAnswer(df2, singleRowDF)

val queryExecution = df2.queryExecution
queryExecution.analyzed.collectFirst {
case _: LogicalRelation => ()
}.getOrElse {
fail(s"Expecting the query plan to convert orc to data sources, " +
s"but got:\n$queryExecution")
}
}
}
}
}
}
}