-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16498][SQL] move hive hack for data source table into HiveExternalCatalog #14155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
b781ef8
dddda52
9ae7a71
4383590
2fc6838
d66a8a8
a2bd4eb
9030c5b
4c2c41b
ce1aff4
9b08cca
fe1c6b4
8496069
774a6dd
715c312
54fdbc6
96d57b6
6ca8909
38b838a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,18 +17,13 @@ | |
|
|
||
| package org.apache.spark.sql.execution.command | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
| import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.execution.datasources._ | ||
| import org.apache.spark.sql.internal.HiveSerDe | ||
| import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
|
|
@@ -97,16 +92,17 @@ case class CreateDataSourceTableCommand( | |
| } | ||
| } | ||
|
|
||
| CreateDataSourceTableUtils.createDataSourceTable( | ||
| sparkSession = sparkSession, | ||
| tableIdent = tableIdent, | ||
| val table = CatalogTable( | ||
| identifier = tableIdent, | ||
| tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, | ||
| storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), | ||
| schema = dataSource.schema, | ||
| partitionColumns = partitionColumns, | ||
| bucketSpec = bucketSpec, | ||
| provider = provider, | ||
| options = optionsWithPath, | ||
| isExternal = isExternal) | ||
| provider = Some(provider), | ||
| partitionColumnNames = partitionColumns, | ||
| bucketSpec = bucketSpec | ||
| ) | ||
|
|
||
| sessionState.catalog.createTable(table, ignoreIfExists) | ||
| Seq.empty[Row] | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sounds like we are not following the existing behaviors. I checked the existing implementation of How about adding it as an independent field in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it different from what we do at line https://github.com/apache/spark/pull/14155/files/96d57b665ac65750eb5c6f9757e5827ea9c14ca4#diff-945e51801b84b92da242fcb42f83f5f5R98?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I put the options in
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That line is just putting the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan How about the write path? My previous reply to @yhuai is for the write path.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm, it sounds like the |
||
| } | ||
|
|
@@ -193,7 +189,7 @@ case class CreateDataSourceTableAsSelectCommand( | |
| } | ||
| existingSchema = Some(l.schema) | ||
| case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => | ||
| existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata)) | ||
| existingSchema = Some(s.metadata.schema) | ||
| case o => | ||
| throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") | ||
| } | ||
|
|
@@ -233,226 +229,21 @@ case class CreateDataSourceTableAsSelectCommand( | |
| // We will use the schema of resolved.relation as the schema of the table (instead of | ||
| // the schema of df). It is important since the nullability may be changed by the relation | ||
| // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). | ||
| CreateDataSourceTableUtils.createDataSourceTable( | ||
| sparkSession = sparkSession, | ||
| tableIdent = tableIdent, | ||
| schema = result.schema, | ||
| partitionColumns = partitionColumns, | ||
| bucketSpec = bucketSpec, | ||
| provider = provider, | ||
| options = optionsWithPath, | ||
| isExternal = isExternal) | ||
| val schema = result.schema | ||
| val table = CatalogTable( | ||
| identifier = tableIdent, | ||
| tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, | ||
| storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), | ||
| schema = schema, | ||
| provider = Some(provider), | ||
| partitionColumnNames = partitionColumns, | ||
| bucketSpec = bucketSpec | ||
| ) | ||
| sessionState.catalog.createTable(table, ignoreIfExists = false) | ||
| } | ||
|
|
||
| // Refresh the cache of the table in the catalog. | ||
| sessionState.catalog.refreshTable(tableIdent) | ||
| Seq.empty[Row] | ||
| } | ||
| } | ||
|
|
||
|
|
||
| object CreateDataSourceTableUtils extends Logging { | ||
|
|
||
| val DATASOURCE_PREFIX = "spark.sql.sources." | ||
| val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" | ||
| val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID" | ||
| val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" | ||
| val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" | ||
| val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." | ||
| val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" | ||
| val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" | ||
| val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" | ||
| val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" | ||
| val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" | ||
| val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." | ||
| val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." | ||
| val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." | ||
| val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." | ||
|
|
||
| def createDataSourceTable( | ||
| sparkSession: SparkSession, | ||
| tableIdent: TableIdentifier, | ||
| schema: StructType, | ||
| partitionColumns: Array[String], | ||
| bucketSpec: Option[BucketSpec], | ||
| provider: String, | ||
| options: Map[String, String], | ||
| isExternal: Boolean): Unit = { | ||
| val tableProperties = new mutable.HashMap[String, String] | ||
| tableProperties.put(DATASOURCE_PROVIDER, provider) | ||
|
|
||
| // Serialized JSON schema string may be too long to be stored into a single metastore table | ||
| // property. In this case, we split the JSON string and store each part as a separate table | ||
| // property. | ||
| val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold | ||
| val schemaJsonString = schema.json | ||
| // Split the JSON string. | ||
| val parts = schemaJsonString.grouped(threshold).toSeq | ||
| tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) | ||
| parts.zipWithIndex.foreach { case (part, index) => | ||
| tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) | ||
| } | ||
|
|
||
| if (partitionColumns.length > 0) { | ||
| tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) | ||
| partitionColumns.zipWithIndex.foreach { case (partCol, index) => | ||
| tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) | ||
| } | ||
| } | ||
|
|
||
| if (bucketSpec.isDefined) { | ||
| val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get | ||
|
|
||
| tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) | ||
| tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) | ||
| bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => | ||
| tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol) | ||
| } | ||
|
|
||
| if (sortColumnNames.nonEmpty) { | ||
| tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString) | ||
| sortColumnNames.zipWithIndex.foreach { case (sortCol, index) => | ||
| tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| val tableType = if (isExternal) { | ||
| tableProperties.put("EXTERNAL", "TRUE") | ||
| CatalogTableType.EXTERNAL | ||
| } else { | ||
| tableProperties.put("EXTERNAL", "FALSE") | ||
| CatalogTableType.MANAGED | ||
| } | ||
|
|
||
| val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf) | ||
| val dataSource = | ||
| DataSource( | ||
| sparkSession, | ||
| userSpecifiedSchema = Some(schema), | ||
| partitionColumns = partitionColumns, | ||
| bucketSpec = bucketSpec, | ||
| className = provider, | ||
| options = options) | ||
|
|
||
| def newSparkSQLSpecificMetastoreTable(): CatalogTable = { | ||
| CatalogTable( | ||
| identifier = tableIdent, | ||
| tableType = tableType, | ||
| schema = new StructType, | ||
| provider = Some(provider), | ||
| storage = CatalogStorageFormat( | ||
| locationUri = None, | ||
| inputFormat = None, | ||
| outputFormat = None, | ||
| serde = None, | ||
| compressed = false, | ||
| properties = options | ||
| ), | ||
| properties = tableProperties.toMap) | ||
| } | ||
|
|
||
| def newHiveCompatibleMetastoreTable( | ||
| relation: HadoopFsRelation, | ||
| serde: HiveSerDe): CatalogTable = { | ||
| assert(partitionColumns.isEmpty) | ||
| assert(relation.partitionSchema.isEmpty) | ||
|
|
||
| CatalogTable( | ||
| identifier = tableIdent, | ||
| tableType = tableType, | ||
| storage = CatalogStorageFormat( | ||
| locationUri = Some(relation.location.paths.map(_.toUri.toString).head), | ||
| inputFormat = serde.inputFormat, | ||
| outputFormat = serde.outputFormat, | ||
| serde = serde.serde, | ||
| compressed = false, | ||
| properties = options | ||
| ), | ||
| schema = relation.schema, | ||
| provider = Some(provider), | ||
| properties = tableProperties.toMap, | ||
| viewText = None) | ||
| } | ||
|
|
||
| // TODO: Support persisting partitioned data source relations in Hive compatible format | ||
| val qualifiedTableName = tableIdent.quotedString | ||
| val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean | ||
| val resolvedRelation = dataSource.resolveRelation(checkPathExist = false) | ||
| val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match { | ||
| case _ if skipHiveMetadata => | ||
| val message = | ||
| s"Persisting partitioned data source relation $qualifiedTableName into " + | ||
| "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." | ||
| (None, message) | ||
|
|
||
| case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 && | ||
| relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty => | ||
| val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) | ||
| val message = | ||
| s"Persisting data source relation $qualifiedTableName with a single input path " + | ||
| s"into Hive metastore in Hive compatible format. Input path: " + | ||
| s"${relation.location.paths.head}." | ||
| (Some(hiveTable), message) | ||
|
|
||
| case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty => | ||
| val message = | ||
| s"Persisting partitioned data source relation $qualifiedTableName into " + | ||
| "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + | ||
| "Input path(s): " + relation.location.paths.mkString("\n", "\n", "") | ||
| (None, message) | ||
|
|
||
| case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty => | ||
| val message = | ||
| s"Persisting bucketed data source relation $qualifiedTableName into " + | ||
| "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + | ||
| "Input path(s): " + relation.location.paths.mkString("\n", "\n", "") | ||
| (None, message) | ||
|
|
||
| case (Some(serde), relation: HadoopFsRelation) => | ||
| val message = | ||
| s"Persisting data source relation $qualifiedTableName with multiple input paths into " + | ||
| "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + | ||
| s"Input paths: " + relation.location.paths.mkString("\n", "\n", "") | ||
| (None, message) | ||
|
|
||
| case (Some(serde), _) => | ||
| val message = | ||
| s"Data source relation $qualifiedTableName is not a " + | ||
| s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " + | ||
| "in Spark SQL specific format, which is NOT compatible with Hive." | ||
| (None, message) | ||
|
|
||
| case _ => | ||
| val message = | ||
| s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + | ||
| s"Persisting data source relation $qualifiedTableName into Hive metastore in " + | ||
| s"Spark SQL specific format, which is NOT compatible with Hive." | ||
| (None, message) | ||
| } | ||
|
|
||
| (hiveCompatibleTable, logMessage) match { | ||
| case (Some(table), message) => | ||
| // We first try to save the metadata of the table in a Hive compatible way. | ||
| // If Hive throws an error, we fall back to save its metadata in the Spark SQL | ||
| // specific way. | ||
| try { | ||
| logInfo(message) | ||
| sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| val warningMessage = | ||
| s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " + | ||
| s"it into Hive metastore in Spark SQL specific format." | ||
| logWarning(warningMessage, e) | ||
| val table = newSparkSQLSpecificMetastoreTable() | ||
| sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false) | ||
| } | ||
|
|
||
| case (None, message) => | ||
| logWarning(message) | ||
| val table = newSparkSQLSpecificMetastoreTable() | ||
| sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We made a change here. Before,
ignoreIfExistsis always set tofalsewhen we callcreateTable. Now, if we want to let the underlyingcreateTablehandles it, we should remove the code: https://github.com/cloud-fan/spark/blob/96d57b665ac65750eb5c6f9757e5827ea9c14ca4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L58-L64There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we hit this branch, the table does not exist, so
ignoreIfExistsdoesn't matter here. I'll change it tofalseand add some comments.