Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,19 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// When we CREATE TABLE without specifying the table schema, we should fail the query if
// bucketing information is specified, as we can't infer bucketing from data files currently,
// and we should ignore the partition columns if it's specified, as we will infer it later, at
// runtime.
// bucketing information is specified, as we can't infer bucketing from data files currently.
// Since the runtime inferred partition columns could be different from what user specified,
// we fail the query if the partitioning information is specified.
case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
if (tableDesc.bucketSpec.isDefined) {
failAnalysis("Cannot specify bucketing information if the table schema is not specified " +
"when creating and will be inferred at runtime")
}

val partitionColumnNames = tableDesc.partitionColumnNames
if (partitionColumnNames.nonEmpty) {
// The table does not have a specified schema, which means that the schema will be inferred
// at runtime. So, we are not expecting partition columns and we will discover partitions
// at runtime. However, if there are specified partition columns, we simply ignore them and
// provide a warning message.
logWarning(
s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " +
s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " +
"be inferred.")
c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
} else {
c
if (tableDesc.partitionColumnNames.nonEmpty) {
failAnalysis("Cannot specify partition information if the table schema is not specified " +
"when creating and will be inferred at runtime")
}
c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this change, I think we need to create a new jira.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new JIRA is created and the PR title is updated. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about It is not allowed to specify partition columns when the table schema is not defined. When the table schema is not provided, schema and partition columns will be inferred.?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds better to me. Thanks! Let me change it now.


// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
// config, and do various checks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,25 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
val uri = path.toURI
sql(
val sqlCreateTable =
s"""
|CREATE TABLE $tabName $schemaClause
|USING parquet
|OPTIONS (
| path '$uri'
|)
|$partitionClause
""".stripMargin)
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
""".stripMargin
if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionCols.nonEmpty) {
val e = intercept[AnalysisException](sql(sqlCreateTable)).getMessage
assert(e.contains("Cannot specify partition information"))
} else {
sql(sqlCreateTable)
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))

assert(expectedSchema == tableMetadata.schema)
assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
assert(expectedSchema == tableMetadata.schema)
assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,24 +614,26 @@ object HiveExternalCatalog {
def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
val props = metadata.properties
props.get(DATASOURCE_SCHEMA).map { schema =>
val schema = props.get(DATASOURCE_SCHEMA)
if (schema.isDefined) {
// Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using `spark.sql.sources.schema` any more, we need to still support.
DataType.fromJson(schema).asInstanceOf[StructType]
} getOrElse {
props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
DataType.fromJson(schema.get).asInstanceOf[StructType]
} else {
val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS)
if (numSchemaParts.isDefined) {
val parts = (0 until numSchemaParts.get.toInt).map { index =>
val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
throw new AnalysisException(errorMessage +
s" (missing part $index of the schema, $numParts parts are expected).")
s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).")
}
part
}
// Stick all parts back to a single schema string.
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
} getOrElse {
} else {
throw new AnalysisException(errorMessage)
}
}
Expand Down