Skip to content

Commit 762366f

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-16552][SQL] Store the Inferred Schemas into External Catalog Tables when Creating Tables
#### What changes were proposed in this pull request? Currently, in Spark SQL, the initial creation of schema can be classified into two groups. It is applicable to both Hive tables and Data Source tables: **Group A. Users specify the schema.** _Case 1 CREATE TABLE AS SELECT_: the schema is determined by the result schema of the SELECT clause. For example, ```SQL CREATE TABLE tab STORED AS TEXTFILE AS SELECT * from input ``` _Case 2 CREATE TABLE_: users explicitly specify the schema. For example, ```SQL CREATE TABLE jsonTable (_1 string, _2 string) USING org.apache.spark.sql.json ``` **Group B. Spark SQL infers the schema at runtime.** _Case 3 CREATE TABLE_. Users do not specify the schema but the path to the file location. For example, ```SQL CREATE TABLE jsonTable USING org.apache.spark.sql.json OPTIONS (path '${tempDir.getCanonicalPath}') ``` Before this PR, Spark SQL does not store the inferred schema in the external catalog for the cases in Group B. When users refreshing the metadata cache, accessing the table at the first time after (re-)starting Spark, Spark SQL will infer the schema and store the info in the metadata cache for improving the performance of subsequent metadata requests. However, the runtime schema inference could cause undesirable schema changes after each reboot of Spark. This PR is to store the inferred schema in the external catalog when creating the table. When users intend to refresh the schema after possible changes on external files (table location), they issue `REFRESH TABLE`. Spark SQL will infer the schema again based on the previously specified table location and update/refresh the schema in the external catalog and metadata cache. In this PR, we do not use the inferred schema to replace the user specified schema for avoiding external behavior changes . Based on the design, user-specified schemas (as described in Group A) can be changed by ALTER TABLE commands, although we do not support them now. #### How was this patch tested? TODO: add more cases to cover the changes. Author: gatorsmile <[email protected]> Closes #14207 from gatorsmile/userSpecifiedSchema.
1 parent 5c2ae79 commit 762366f

7 files changed

Lines changed: 291 additions & 79 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
3131
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3232
import org.apache.spark.sql.execution.datasources._
3333
import org.apache.spark.sql.internal.HiveSerDe
34-
import org.apache.spark.sql.sources.InsertableRelation
34+
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
3535
import org.apache.spark.sql.types._
3636

3737
/**
@@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand(
5252
userSpecifiedSchema: Option[StructType],
5353
provider: String,
5454
options: Map[String, String],
55-
partitionColumns: Array[String],
55+
userSpecifiedPartitionColumns: Array[String],
5656
bucketSpec: Option[BucketSpec],
5757
ignoreIfExists: Boolean,
5858
managedIfNoPath: Boolean)
@@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand(
9595
}
9696

9797
// Create the relation to validate the arguments before writing the metadata to the metastore.
98-
DataSource(
99-
sparkSession = sparkSession,
100-
userSpecifiedSchema = userSpecifiedSchema,
101-
className = provider,
102-
bucketSpec = None,
103-
options = optionsWithPath).resolveRelation(checkPathExist = false)
98+
val dataSource: BaseRelation =
99+
DataSource(
100+
sparkSession = sparkSession,
101+
userSpecifiedSchema = userSpecifiedSchema,
102+
className = provider,
103+
bucketSpec = None,
104+
options = optionsWithPath).resolveRelation(checkPathExist = false)
105+
106+
val partitionColumns = if (userSpecifiedSchema.nonEmpty) {
107+
userSpecifiedPartitionColumns
108+
} else {
109+
val res = dataSource match {
110+
case r: HadoopFsRelation => r.partitionSchema.fieldNames
111+
case _ => Array.empty[String]
112+
}
113+
if (userSpecifiedPartitionColumns.length > 0) {
114+
// The table does not have a specified schema, which means that the schema will be inferred
115+
// when we load the table. So, we are not expecting partition columns and we will discover
116+
// partitions when we load the table. However, if there are specified partition columns,
117+
// we simply ignore them and provide a warning message.
118+
logWarning(
119+
s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " +
120+
s"ignored. The schema and partition columns of table $tableIdent are inferred. " +
121+
s"Schema: ${dataSource.schema.simpleString}; " +
122+
s"Partition columns: ${res.mkString("(", ", ", ")")}")
123+
}
124+
res
125+
}
104126

105127
CreateDataSourceTableUtils.createDataSourceTable(
106128
sparkSession = sparkSession,
107129
tableIdent = tableIdent,
108-
userSpecifiedSchema = userSpecifiedSchema,
130+
schema = dataSource.schema,
109131
partitionColumns = partitionColumns,
110132
bucketSpec = bucketSpec,
111133
provider = provider,
@@ -213,7 +235,7 @@ case class CreateDataSourceTableAsSelectCommand(
213235
}
214236
existingSchema = Some(l.schema)
215237
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
216-
existingSchema = DDLUtils.getSchemaFromTableProperties(s.metadata)
238+
existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
217239
case o =>
218240
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
219241
}
@@ -256,7 +278,7 @@ case class CreateDataSourceTableAsSelectCommand(
256278
CreateDataSourceTableUtils.createDataSourceTable(
257279
sparkSession = sparkSession,
258280
tableIdent = tableIdent,
259-
userSpecifiedSchema = Some(result.schema),
281+
schema = result.schema,
260282
partitionColumns = partitionColumns,
261283
bucketSpec = bucketSpec,
262284
provider = provider,
@@ -306,7 +328,7 @@ object CreateDataSourceTableUtils extends Logging {
306328
def createDataSourceTable(
307329
sparkSession: SparkSession,
308330
tableIdent: TableIdentifier,
309-
userSpecifiedSchema: Option[StructType],
331+
schema: StructType,
310332
partitionColumns: Array[String],
311333
bucketSpec: Option[BucketSpec],
312334
provider: String,
@@ -315,28 +337,26 @@ object CreateDataSourceTableUtils extends Logging {
315337
val tableProperties = new mutable.HashMap[String, String]
316338
tableProperties.put(DATASOURCE_PROVIDER, provider)
317339

318-
// Saves optional user specified schema. Serialized JSON schema string may be too long to be
319-
// stored into a single metastore SerDe property. In this case, we split the JSON string and
320-
// store each part as a separate SerDe property.
321-
userSpecifiedSchema.foreach { schema =>
322-
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
323-
val schemaJsonString = schema.json
324-
// Split the JSON string.
325-
val parts = schemaJsonString.grouped(threshold).toSeq
326-
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
327-
parts.zipWithIndex.foreach { case (part, index) =>
328-
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
329-
}
340+
// Serialized JSON schema string may be too long to be stored into a single metastore table
341+
// property. In this case, we split the JSON string and store each part as a separate table
342+
// property.
343+
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
344+
val schemaJsonString = schema.json
345+
// Split the JSON string.
346+
val parts = schemaJsonString.grouped(threshold).toSeq
347+
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
348+
parts.zipWithIndex.foreach { case (part, index) =>
349+
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
330350
}
331351

332-
if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
352+
if (partitionColumns.length > 0) {
333353
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
334354
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
335355
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
336356
}
337357
}
338358

339-
if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
359+
if (bucketSpec.isDefined) {
340360
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
341361

342362
tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
@@ -353,16 +373,6 @@ object CreateDataSourceTableUtils extends Logging {
353373
}
354374
}
355375

356-
if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
357-
// The table does not have a specified schema, which means that the schema will be inferred
358-
// when we load the table. So, we are not expecting partition columns and we will discover
359-
// partitions when we load the table. However, if there are specified partition columns,
360-
// we simply ignore them and provide a warning message.
361-
logWarning(
362-
s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
363-
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
364-
}
365-
366376
val tableType = if (isExternal) {
367377
tableProperties.put("EXTERNAL", "TRUE")
368378
CatalogTableType.EXTERNAL
@@ -375,7 +385,7 @@ object CreateDataSourceTableUtils extends Logging {
375385
val dataSource =
376386
DataSource(
377387
sparkSession,
378-
userSpecifiedSchema = userSpecifiedSchema,
388+
userSpecifiedSchema = Some(schema),
379389
partitionColumns = partitionColumns,
380390
bucketSpec = bucketSpec,
381391
className = provider,

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -521,31 +521,29 @@ object DDLUtils {
521521
table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
522522
}
523523

524-
// A persisted data source table may not store its schema in the catalog. In this case, its schema
525-
// will be inferred at runtime when the table is referenced.
526-
def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
524+
// A persisted data source table always store its schema in the catalog.
525+
def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
527526
require(isDatasourceTable(metadata))
527+
val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted."
528528
val props = metadata.properties
529-
if (props.isDefinedAt(DATASOURCE_SCHEMA)) {
529+
props.get(DATASOURCE_SCHEMA).map { schema =>
530530
// Originally, we used spark.sql.sources.schema to store the schema of a data source table.
531531
// After SPARK-6024, we removed this flag.
532532
// Although we are not using spark.sql.sources.schema any more, we need to still support.
533-
props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType])
534-
} else {
535-
metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
533+
DataType.fromJson(schema).asInstanceOf[StructType]
534+
} getOrElse {
535+
props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
536536
val parts = (0 until numParts.toInt).map { index =>
537537
val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
538538
if (part == null) {
539-
throw new AnalysisException(
540-
"Could not read schema from the metastore because it is corrupted " +
541-
s"(missing part $index of the schema, $numParts parts are expected).")
539+
throw new AnalysisException(msgSchemaCorrupted +
540+
s" (missing part $index of the schema, $numParts parts are expected).")
542541
}
543-
544542
part
545543
}
546544
// Stick all parts back to a single schema string.
547545
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
548-
}
546+
} getOrElse(throw new AnalysisException(msgSchemaCorrupted))
549547
}
550548
}
551549

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -416,15 +416,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
416416
} else {
417417
val metadata = catalog.getTableMetadata(table)
418418

419-
if (DDLUtils.isDatasourceTable(metadata)) {
420-
DDLUtils.getSchemaFromTableProperties(metadata) match {
421-
case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, result)
422-
case None => describeSchema(catalog.lookupRelation(table).schema, result)
423-
}
424-
} else {
425-
describeSchema(metadata.schema, result)
426-
}
427-
419+
describeSchema(metadata, result)
428420
if (isExtended) {
429421
describeExtended(metadata, result)
430422
} else if (isFormatted) {
@@ -439,12 +431,12 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
439431

440432
private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
441433
if (DDLUtils.isDatasourceTable(table)) {
442-
val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
443434
val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table)
444-
for (schema <- userSpecifiedSchema if partColNames.nonEmpty) {
435+
if (partColNames.nonEmpty) {
436+
val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
445437
append(buffer, "# Partition Information", "", "")
446438
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
447-
describeSchema(StructType(partColNames.map(schema(_))), buffer)
439+
describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer)
448440
}
449441
} else {
450442
if (table.partitionColumns.nonEmpty) {
@@ -518,6 +510,17 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
518510
}
519511
}
520512

513+
private def describeSchema(
514+
tableDesc: CatalogTable,
515+
buffer: ArrayBuffer[Row]): Unit = {
516+
if (DDLUtils.isDatasourceTable(tableDesc)) {
517+
val schema = DDLUtils.getSchemaFromTableProperties(tableDesc)
518+
describeSchema(schema, buffer)
519+
} else {
520+
describeSchema(tableDesc.schema, buffer)
521+
}
522+
}
523+
521524
private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
522525
schema.foreach { column =>
523526
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
@@ -876,12 +879,9 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
876879

877880
private def showDataSourceTableDataColumns(
878881
metadata: CatalogTable, builder: StringBuilder): Unit = {
879-
DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema =>
880-
val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
881-
builder ++= columns.mkString("(", ", ", ")")
882-
}
883-
884-
builder ++= "\n"
882+
val schema = DDLUtils.getSchemaFromTableProperties(metadata)
883+
val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
884+
builder ++= columns.mkString("(", ", ", ")\n")
885885
}
886886

887887
private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[Logi
205205
*/
206206
private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
207207
private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
208-
val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
208+
val schema = DDLUtils.getSchemaFromTableProperties(table)
209209

210210
// We only need names at here since userSpecifiedSchema we loaded from the metastore
211211
// contains partition columns. We can always get datatypes of partitioning columns
@@ -218,7 +218,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[
218218
val dataSource =
219219
DataSource(
220220
sparkSession,
221-
userSpecifiedSchema = userSpecifiedSchema,
221+
userSpecifiedSchema = Some(schema),
222222
partitionColumns = partitionColumns,
223223
bucketSpec = bucketSpec,
224224
className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
352352

353353
/**
354354
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
355-
* is refreshed.
355+
* is refreshed. For data source tables, the schema will not be inferred and refreshed.
356356
*
357357
* @group cachemgmt
358358
* @since 2.0.0
359359
*/
360360
override def refreshTable(tableName: String): Unit = {
361361
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
362+
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
363+
// Non-temp tables: refresh the metadata cache.
362364
sessionCatalog.refreshTable(tableIdent)
363365

364366
// If this table is cached as an InMemoryRelation, drop the original

0 commit comments

Comments
 (0)