Skip to content

Commit 8d212f0

Browse files
committed
[SPARK-15474][SQL] Write and read back non-emtpy schema with empty dataframe
1 parent 5433be4 commit 8d212f0

3 files changed

Lines changed: 63 additions & 8 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@
1717

1818
package org.apache.spark.sql.execution.datasources.orc
1919

20-
import org.apache.orc.TypeDescription
20+
import java.io._
2121

22-
import org.apache.spark.sql.AnalysisException
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
24+
import org.apache.orc.{OrcFile, TypeDescription}
25+
26+
import org.apache.spark.internal.Logging
27+
import org.apache.spark.sql.{AnalysisException, SparkSession}
28+
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2329
import org.apache.spark.sql.types.StructType
2430

25-
private[sql] object OrcFileFormat {
31+
private[sql] object OrcFileFormat extends Logging {
2632
private def checkFieldName(name: String): Unit = {
2733
try {
2834
TypeDescription.fromString(s"struct<$name:int>")
@@ -39,4 +45,32 @@ private[sql] object OrcFileFormat {
3945
schema.fieldNames.foreach(checkFieldName)
4046
schema
4147
}
48+
49+
def getSchemaString(schema: StructType): String = {
50+
schema.fields.map(f => s"${f.name}:${f.dataType.catalogString}").mkString("struct<", ",", ">")
51+
}
52+
53+
private def readSchema(file: Path, conf: Configuration, fs: FileSystem) = {
54+
try {
55+
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
56+
val reader = OrcFile.createReader(file, readerOptions)
57+
val schema = reader.getSchema
58+
if (schema.getFieldNames.size == 0) {
59+
None
60+
} else {
61+
Some(schema)
62+
}
63+
} catch {
64+
case _: IOException => None
65+
}
66+
}
67+
68+
def readSchema(sparkSession: SparkSession, files: Seq[FileStatus]): Option[StructType] = {
69+
val conf = sparkSession.sparkContext.hadoopConfiguration
70+
val fs = FileSystem.get(conf)
71+
files.map(_.getPath).flatMap(readSchema(_, conf, fs)).headOption.map { schema =>
72+
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
73+
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
74+
}
75+
}
4276
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.hadoop.io.{NullWritable, Writable}
3232
import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
3333
import org.apache.hadoop.mapreduce._
3434
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
35-
import org.apache.orc.OrcConf.COMPRESS
35+
import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA}
3636

3737
import org.apache.spark.TaskContext
3838
import org.apache.spark.sql.SparkSession
@@ -58,10 +58,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
5858
sparkSession: SparkSession,
5959
options: Map[String, String],
6060
files: Seq[FileStatus]): Option[StructType] = {
61-
OrcFileOperator.readSchema(
62-
files.map(_.getPath.toString),
63-
Some(sparkSession.sessionState.newHadoopConf())
64-
)
61+
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.readSchema(sparkSession, files)
6562
}
6663

6764
override def prepareWrite(
@@ -73,6 +70,10 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
7370

7471
val configuration = job.getConfiguration
7572

73+
configuration.set(
74+
MAPRED_OUTPUT_SCHEMA.getAttribute,
75+
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.getSchemaString(dataSchema))
76+
7677
configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
7778
configuration match {
7879
case conf: JobConf =>
@@ -252,6 +253,12 @@ private[orc] class OrcOutputWriter(
252253
override def close(): Unit = {
253254
if (recordWriterInstantiated) {
254255
recordWriter.close(Reporter.NULL)
256+
} else {
257+
// SPARK-15474 Write empty orc file with correct schema
258+
val conf = context.getConfiguration()
259+
val writer = org.apache.orc.OrcFile.createWriter(
260+
new Path(path), org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf))
261+
new org.apache.orc.mapreduce.OrcMapreduceRecordWriter(writer).close(context)
255262
}
256263
}
257264
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2153,4 +2153,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
21532153
}
21542154
}
21552155
}
2156+
2157+
Seq("orc", "parquet").foreach { format =>
2158+
test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") {
2159+
withTempPath { file =>
2160+
val path = file.getCanonicalPath
2161+
val emptyDf = Seq((true, 1, "str")).toDF.limit(0)
2162+
emptyDf.write.format(format).save(path)
2163+
2164+
val df = spark.read.format(format).load(path)
2165+
assert(df.schema.sameType(emptyDf.schema))
2166+
checkAnswer(df, emptyDf)
2167+
}
2168+
}
2169+
}
21562170
}

0 commit comments

Comments
 (0)