Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand(
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
partitionColumns: Array[String],
userSpecifiedPartitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
ignoreIfExists: Boolean,
managedIfNoPath: Boolean)
Expand Down Expand Up @@ -95,17 +95,38 @@ case class CreateDataSourceTableCommand(
}

// Create the relation to validate the arguments before writing the metadata to the metastore.
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath).resolveRelation(checkPathExist = false)
val dataSource: HadoopFsRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath)
.resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation]
Copy link
Contributor

@cloud-fan cloud-fan Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to cast it HadoopFsRelation?

Copy link
Contributor

@cloud-fan cloud-fan Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a safer way is to do a pattern match here, if it's HadoopFsRelation, get its partition columns, else, no partition columns

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it.


if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simply ignore them and provide a warning message.
logWarning(
s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " +
s"ignored. The schema and partition columns of table $tableIdent are inferred. " +
s"Schema: ${dataSource.schema.simpleString}; " +
s"Partition columns: ${dataSource.partitionSchema.fieldNames}")
}

val partitionColumns =
if (userSpecifiedSchema.isEmpty) {
dataSource.partitionSchema.fieldNames
} else {
userSpecifiedPartitionColumns
}

CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = userSpecifiedSchema,
schema = dataSource.schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we should still use the user-specified schema, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from the code, it is not very clear that dataSource.schema will be userSpecifiedSchema?

Copy link
Member Author

@gatorsmile gatorsmile Aug 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, dataSource.schema could be the inferred schema. Previously, we do not store the inferred schema. After this PR, we did and thus we use dataSource.schema.

Actually, after re-checking the code, I found the schema might be adjusted a little even if users specify the schema. For example, the nullability could be changed :

I think we should make such a change but maybe we should test and log it?

partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
Expand Down Expand Up @@ -256,7 +277,7 @@ case class CreateDataSourceTableAsSelectCommand(
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = Some(result.schema),
schema = result.schema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
Expand Down Expand Up @@ -306,7 +327,7 @@ object CreateDataSourceTableUtils extends Logging {
def createDataSourceTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
schema: StructType,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
provider: String,
Expand All @@ -318,25 +339,23 @@ object CreateDataSourceTableUtils extends Logging {
// Saves optional user specified schema. Serialized JSON schema string may be too long to be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is not correct anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will correct it. Thanks!

// stored into a single metastore SerDe property. In this case, we split the JSON string and
// store each part as a separate SerDe property.
userSpecifiedSchema.foreach { schema =>
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
if (partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}

if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
if (bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
Expand All @@ -353,16 +372,6 @@ object CreateDataSourceTableUtils extends Logging {
}
}

if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simply ignore them and provide a warning message.
logWarning(
s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
}

val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
CatalogTableType.EXTERNAL
Expand All @@ -375,7 +384,7 @@ object CreateDataSourceTableUtils extends Logging {
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
} else {
val metadata = catalog.getTableMetadata(table)

if (DDLUtils.isDatasourceTable(metadata)) {
DDLUtils.getSchemaFromTableProperties(metadata) match {
case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, result)
case None => describeSchema(catalog.lookupRelation(table).schema, result)
}
} else {
describeSchema(metadata.schema, result)
}

describeSchema(metadata, result)
if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
Expand Down Expand Up @@ -518,6 +510,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}

private def describeSchema(
tableDesc: CatalogTable,
buffer: ArrayBuffer[Row]): Unit = {
if (DDLUtils.isDatasourceTable(tableDesc)) {
DDLUtils.getSchemaFromTableProperties(tableDesc) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now getSchemaFromTableProperties should never return None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all types of data source tables, we store the schema in the table properties. Thus, we should not return None; unless the table properties are modified by users using the Alter Table command.

Sorry, forgot to update the message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, the message is changed to "# Schema of this table is corrupted"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make DDLUtils.getSchemaFromTableProperties always return a schema and throw exception if it's corrupted? I think it's more consistent with the previous behaviour, i.e. throw exception if the expected schema properties doesn't exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, buffer)
case None => append(buffer, "# Schema of this table is inferred at runtime", "", "")
}
} else {
describeSchema(tableDesc.schema, buffer)
}
}

private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

/**
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
* is refreshed.
* is refreshed. For data source tables, the schema will not be inferred and refreshed.
*
* @group cachemgmt
* @since 2.0.0
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)

// If this table is cached as an InMemoryRelation, drop the original
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Expand Down Expand Up @@ -252,6 +252,115 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}

test("Create data source table with partitioning columns but no schema") {
import testImplicits._

val tabName = "tab1"
withTempPath { dir =>
val pathToPartitionedTable = new File(dir, "partitioned")
val pathToNonPartitionedTable = new File(dir, "nonPartitioned")
val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
df.write.format("parquet").save(pathToNonPartitionedTable.getCanonicalPath)
df.write.format("parquet").partitionBy("num").save(pathToPartitionedTable.getCanonicalPath)

Seq(pathToPartitionedTable, pathToNonPartitionedTable).foreach { path =>
withTable(tabName) {
spark.sql(
s"""
|CREATE TABLE $tabName
|USING parquet
|OPTIONS (
| path '$path'
|)
|PARTITIONED BY (inexistentColumns)
""".stripMargin)
val catalog = spark.sessionState.catalog
val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))

val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
assert(tableSchema.nonEmpty, "the schema of data source tables are always recorded")
val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)

if (tableMetadata.storage.serdeProperties.get("path") ==
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could this condition be false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test case, it verifies two scenarios. One is the path to a partitioned table (i.e., pathToPartitionedTable); another is the path to a non-partitioned table (i.e., pathToNonPartitionedTable) . This condition is just to check which path is being used. If the path points to pathToNonPartitionedTable, it will return false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, can we separate it into 2 cases instead of doing Seq(...).foreach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, no problem

Option(pathToPartitionedTable.getCanonicalPath)) {
assert(partCols == Seq("num"))
assert(tableSchema ==
Option(StructType(StructField("str", StringType, nullable = true) ::
StructField("num", IntegerType, nullable = true) :: Nil)))
} else {
assert(partCols.isEmpty)
assert(tableSchema ==
Option(StructType(StructField("num", IntegerType, nullable = true) ::
StructField("str", StringType, nullable = true) :: Nil)))
}
}
}
}
}

test("Refresh table after changing the data source table partitioning") {
import testImplicits._

val tabName = "tab1"
val catalog = spark.sessionState.catalog
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString, i, i))
.toDF("col1", "col2", "col3", "col4")
df.write.format("json").partitionBy("col1", "col3").save(path)
val schema = StructType(
StructField("col2", StringType, nullable = true) ::
StructField("col4", LongType, nullable = true) ::
StructField("col1", IntegerType, nullable = true) ::
StructField("col3", IntegerType, nullable = true) :: Nil)
val partitionCols = Seq("col1", "col3")

// Ensure the schema is split to multiple properties.
withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "1") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we used this to verify whether the refresh works well when the table schema is split to multiple properties. Now, since we do not need to refresh the schema, we can remove it. Thanks!

withTable(tabName) {
spark.sql(
s"""
|CREATE TABLE $tabName
|USING json
|OPTIONS (
| path '$path'
|)
""".stripMargin)
val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
assert(tableSchema == Option(schema))
val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
assert(partCols == partitionCols)

// Change the schema
val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
.toDF("newCol1", "newCol2")
newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path)

// No change on the schema
val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
val tableSchemaBeforeRefresh =
DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh)
assert(tableSchemaBeforeRefresh == Option(schema))
val partColsBeforeRefresh =
DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh)
assert(partColsBeforeRefresh == partitionCols)

// Refresh does not affect the schema
spark.catalog.refreshTable(tabName)

val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
val tableSchemaAfterRefresh =
DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh)
assert(tableSchemaAfterRefresh == Option(schema))
val partColsAfterRefresh =
DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh)
assert(partColsAfterRefresh == partitionCols)
}
}
}
}

test("desc table for parquet data source table using in-memory catalog") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
val tabName = "tab1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv

sql("REFRESH TABLE jsonTable")

// Check that the refresh worked
// After refresh, schema is not changed.
checkAnswer(
sql("SELECT * FROM jsonTable"),
Row("a1", "b1", "c1"))
Row("a1", "b1"))
}
}
}
Expand Down Expand Up @@ -703,7 +703,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
schema = schema,
partitionColumns = Array.empty[String],
bucketSpec = None,
provider = "json",
Expand Down Expand Up @@ -988,7 +988,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("not_skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
schema = schema,
partitionColumns = Array.empty[String],
bucketSpec = None,
provider = "parquet",
Expand All @@ -1003,7 +1003,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
schema = schema,
partitionColumns = Array.empty[String],
bucketSpec = None,
provider = "parquet",
Expand Down