Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ORC_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.orc.mergeSchema")
.doc("When true, the Orc data source merges schemas collected from all data files, " +
"otherwise the schema is picked from a random data file.")
.booleanConf
.createWithDefault(false)

val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
.doc("When true, check all the partition paths under the table\'s root directory " +
"when reading data stored in HDFS. This configuration will be deprecated in the future " +
Expand Down Expand Up @@ -1907,6 +1913,8 @@ class SQLConf extends Serializable with Logging {

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def isOrcSchemaMergingEnabled: Boolean = getConf(ORC_SCHEMA_MERGING_ENABLED)

def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class OrcFileFormat
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
OrcUtils.readSchema(sparkSession, files)
OrcUtils.inferSchema(sparkSession, files, options)
}

override def prepareWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,20 @@ class OrcOptions(
}
shortOrcCompressionCodecNames(codecName)
}

/**
* Whether it merges schemas or not. When the given Orc files have different schemas,
* the schemas can be merged. By default use the value specified in SQLConf.
*/
val mergeSchema: Boolean = parameters
.get(MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sqlConf.isOrcSchemaMergingEnabled)
}

object OrcOptions {
val MERGE_SCHEMA = "mergeSchema"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should at least add a test case for this new option

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a new test case in OrcSuite


// The ORC compression short names
private val shortOrcCompressionCodecNames = Map(
"none" -> "NONE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

object OrcUtils extends Logging {

Expand Down Expand Up @@ -82,14 +83,95 @@ object OrcUtils extends Logging {
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConf()
// TODO: We need to support merge schema. Please see SPARK-11412.
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
}
}

/**
* Read single ORC file schema using native version of ORC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the following line because this is only used in this file and OrcSourceSuite.

   * This is visible for testing.

*/
def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean)
: Option[StructType] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the existing code around here follows a wrong indentation rule. Let's use correct indentation at least at new code. : Option[StructType] should have 2-space indentation instead of 4-space.

   def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean)
-      : Option[StructType] = {
+    : Option[StructType] = {

OrcUtils.readSchema(new Path(file), conf, ignoreCorruptFiles)
.map(s => CatalystSqlParser.parseDataType(s.toString).asInstanceOf[StructType])
}

/**
* Figures out a merged ORC schema with a distributed Spark job.
*/
def mergeSchemasInParallel(
sparkSession: SparkSession,
files: Seq[FileStatus],
singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType])
: Option[StructType] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. 2-space.

val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())

val filePaths = files.map(_.getPath.toString)

// Set the number of partitions to prevent following schema reads from generating many tasks
// in case of a small number of orc files.
val numParallelism = Math.min(Math.max(filePaths.size, 1),
sparkSession.sparkContext.defaultParallelism)

val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

// Issues a Spark job to read ORC schema in parallel.
val partiallyMergedSchemas =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll work on it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored it by adding a SchemaMergeUtils. Please review it again

sparkSession
.sparkContext
.parallelize(filePaths, numParallelism)
.mapPartitions { iterator =>
// Reads Orc schema in multi-threaded manner.
val partFiles = iterator.toSeq
val schemas = ThreadUtils.parmap(partFiles, "readingOrcSchemas", 8) { currentFile =>
singleFileSchemaReader(currentFile, serializedConf.value, ignoreCorruptFiles)
}.flatten

if (schemas.isEmpty) {
Iterator.empty
} else {
var mergedSchema = schemas.head
schemas.tail.foreach { schema =>
try {
mergedSchema = mergedSchema.merge(schema)
} catch { case cause: SparkException =>
throw new SparkException(
s"Failed merging schema:\n${schema.treeString}", cause)
}
}
Iterator.single(mergedSchema)
}
}.collect()

if (partiallyMergedSchemas.isEmpty) {
None
} else {
var finalSchema = partiallyMergedSchemas.head
partiallyMergedSchemas.tail.foreach { schema =>
try {
finalSchema = finalSchema.merge(schema)
} catch { case cause: SparkException =>
throw new SparkException(
s"Failed merging schema:\n${schema.treeString}", cause)
}
}
Some(finalSchema)
}
}

def inferSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep the name readSchema here. We rename the method that reads schema from one file instead.

: Option[StructType] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this function is used in native ORC readers (OrcFileFormat/OrcTable), and hive OrcFileFormat has its own implementation of inferSchema, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this function is used in native ORC readers (OrcFileFormat/OrcTable), and hive OrcFileFormat has its own implementation of inferSchema, right?

Yes. Do you think it's necessary to refactor this function?

val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
if (orcOptions.mergeSchema) {
OrcUtils.mergeSchemasInParallel(sparkSession, files, OrcUtils.singleFileSchemaReader)
} else {
OrcUtils.readSchema(sparkSession, files)
}
}

/**
* Returns the requested column ids from the given ORC file. Column id can be -1, which means the
* requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2.orc

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
Expand All @@ -39,7 +41,7 @@ case class OrcTable(
new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)

override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
OrcUtils.readSchema(sparkSession, files)
OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
new OrcWriteBuilder(options, paths, formatName, supportsDataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ import java.sql.Timestamp
import java.util.Locale

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.orc.OrcConf.COMPRESS
import org.apache.orc.OrcFile
import org.apache.orc.OrcProto.ColumnEncoding.Kind.{DICTIONARY_V2, DIRECT, DIRECT_V2}
import org.apache.orc.OrcProto.Stream.Kind
import org.apache.orc.impl.RecordReaderImpl
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.util.Utils

case class OrcData(intField: Int, stringField: String)
Expand Down Expand Up @@ -188,6 +189,49 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}

protected def testMergeSchemasInParallel(
ignoreCorruptFiles: Boolean,
singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]): Unit = {
withSQLConf(
SQLConf.IGNORE_CORRUPT_FILES.key -> ignoreCorruptFiles.toString,
SQLConf.ORC_IMPLEMENTATION.key -> orcImp) {
withTempDir { dir =>
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
val basePath = dir.getCanonicalPath

val path1 = new Path(basePath, "first")
val path2 = new Path(basePath, "second")
val path3 = new Path(basePath, "third")

spark.range(1).toDF("a").coalesce(1).write.orc(path1.toString)
spark.range(1, 2).toDF("b").coalesce(1).write.orc(path2.toString)
spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString)

val fileStatuses =
Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten

val schema = OrcUtils.mergeSchemasInParallel(
spark,
fileStatuses,
singleFileSchemaReader)

assert(schema.isDefined == true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: assert(schema.isDefined)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assert(schema.get == StructType(Seq(
StructField("a", LongType, true),
StructField("b", LongType, true))))
}
}
}

protected def testMergeSchemasInParallel(
singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]): Unit = {
testMergeSchemasInParallel(true, singleFileSchemaReader)
val exception = intercept[SparkException] {
testMergeSchemasInParallel(false, singleFileSchemaReader)
}.getCause
assert(exception.getCause.getMessage.contains("Could not read footer for file"))
}

test("create temporary orc table") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))

Expand Down Expand Up @@ -377,4 +421,8 @@ class OrcSourceSuite extends OrcSuite with SharedSQLContext {
test("Enforce direct encoding column-wise selectively") {
testSelectiveDictionaryEncoding(isSelective = true)
}

test("SPARK-11412 read and merge orc schemas in parallel") {
testMergeSchemasInParallel(OrcUtils.singleFileSchemaReader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcOptions
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types._
Expand All @@ -67,12 +68,20 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
Some(sparkSession.sessionState.newHadoopConf()),
ignoreCorruptFiles
)
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
if (orcOptions.mergeSchema) {
OrcUtils.mergeSchemasInParallel(
sparkSession,
files,
OrcFileOperator.singleFileSchemaReader)
} else {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
Some(sparkSession.sessionState.newHadoopConf()),
ignoreCorruptFiles
)
}
}

override def prepareWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ private[hive] object OrcFileOperator extends Logging {
}
}

/**
* Read single ORC file schema using Hive ORC library
*/
def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean)
: Option[StructType] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. 2-space.

getFileReader(file, Some(conf), ignoreCorruptFiles).map(reader => {
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $file, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType]
})
}

def getObjectInspector(
path: String, conf: Option[Configuration]): Option[StructObjectInspector] = {
getFileReader(path, conf).map(_.getObjectInspector.asInstanceOf[StructObjectInspector])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,8 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
}
}
}

test("SPARK-11412 read and merge orc schemas in parallel") {
testMergeSchemasInParallel(OrcFileOperator.singleFileSchemaReader)
}
}