Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ abstract class Catalog {
* If this table is cached as an InMemoryRelation, drop the original cached version and make the
* new version cached lazily.
*
* If the table's schema is inferred at runtime, infer the schema again and update the schema
Copy link
Contributor

@cloud-fan cloud-fan Jul 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rxin, I'm thinking of what's the main reason to allow inferring the table schema at run time. IIRC, it's mainly because we wanna save some typing when creating external data source table by SQL string, which usually have very long schema, e.g. json files.

If this is true, then the table schema is not supposed to change. If users do wanna change it, I'd argue that it's a different table, users should drop this table and create a new one. Then we don't need to make refresh table support schema changing and thus don't need to store the DATASOURCE_SCHEMA_ISINFERRED flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refreshTable shouldn't run schema inference. Only run schema inference when creating the table.

And don't make this a config flag. Just run schema inference when creating the table. For managed tables, store the schema explicitly. Users must explicitly change it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin @cloud-fan I see. Will make a change

FYI, this will change the existing external behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes unfortunately I find out about this one too late. I will add it to the release notes for 2.0 that this change will come.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

* in the external catalog.
*
* @since 2.0.0
*/
def refreshTable(tableName: String): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand(
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
partitionColumns: Array[String],
userSpecifiedPartitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
ignoreIfExists: Boolean,
managedIfNoPath: Boolean)
Expand Down Expand Up @@ -95,17 +95,37 @@ case class CreateDataSourceTableCommand(
}

// Create the relation to validate the arguments before writing the metadata to the metastore.
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath).resolveRelation(checkPathExist = false)
val dataSource: HadoopFsRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath)
.resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation]
Copy link
Contributor

@cloud-fan cloud-fan Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to cast it HadoopFsRelation?

Copy link
Contributor

@cloud-fan cloud-fan Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a safer way is to do a pattern match here, if it's HadoopFsRelation, get its partition columns, else, no partition columns

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it.


val partitionColumns =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the logic should be: if schema is specified, use the given partition columns, else, infer it. Maybe it's more clear to write:

val partitionColumns = if (userSpecifiedSchema.isEmpty) {
  if (userSpecifiedPartitionColumns.length > 0) {
    ...
  }
  dataSource match {
    ...
  }
} else {
  userSpecifiedPartitionColumns
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it. Thanks!

if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simply ignore them and provide a warning message.
logWarning(
s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will " +
"be ignored.")
dataSource.partitionSchema.fieldNames
} else {
userSpecifiedPartitionColumns
}

val schemaType = if (userSpecifiedSchema.isEmpty) SchemaType.INFERRED else SchemaType.USER

CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = userSpecifiedSchema,
schema = dataSource.schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we should still use the user-specified schema, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from the code, it is not very clear that dataSource.schema will be userSpecifiedSchema?

Copy link
Member Author

@gatorsmile gatorsmile Aug 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, dataSource.schema could be the inferred schema. Previously, we do not store the inferred schema. After this PR, we did and thus we use dataSource.schema.

Actually, after re-checking the code, I found the schema might be adjusted a little even if users specify the schema. For example, the nullability could be changed :

I think we should make such a change but maybe we should test and log it?

schemaType = schemaType,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
Expand Down Expand Up @@ -256,7 +276,8 @@ case class CreateDataSourceTableAsSelectCommand(
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = Some(result.schema),
schema = result.schema,
schemaType = SchemaType.USER,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
Expand All @@ -270,6 +291,11 @@ case class CreateDataSourceTableAsSelectCommand(
}
}

case class SchemaType private(name: String)
object SchemaType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we have more schema type? If not, I think a boolean flag isSchemaInferred should be good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it.

val USER = new SchemaType("USER")
val INFERRED = new SchemaType("INFERRED")
}

object CreateDataSourceTableUtils extends Logging {

Expand All @@ -279,6 +305,7 @@ object CreateDataSourceTableUtils extends Logging {
val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
val DATASOURCE_SCHEMA_TYPE = DATASOURCE_SCHEMA_PREFIX + "type"
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
Expand All @@ -303,10 +330,40 @@ object CreateDataSourceTableUtils extends Logging {
matcher.matches()
}

/**
* Saves the schema (including partition info) into the table properties.
* Overwrites the schema, if already existed.
*/
def saveSchema(
sparkSession: SparkSession,
schema: StructType,
partitionColumns: Array[String],
tableProperties: mutable.HashMap[String, String]): Unit = {
// Serialized JSON schema string may be too long to be stored into a single
// metastore SerDe property. In this case, we split the JSON string and store each part as
// a separate table property.
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

if (partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}
}

def createDataSourceTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
schema: StructType,
schemaType: SchemaType,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
provider: String,
Expand All @@ -315,28 +372,10 @@ object CreateDataSourceTableUtils extends Logging {
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)

// Saves optional user specified schema. Serialized JSON schema string may be too long to be
// stored into a single metastore SerDe property. In this case, we split the JSON string and
// store each part as a separate SerDe property.
userSpecifiedSchema.foreach { schema =>
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
}
tableProperties.put(DATASOURCE_SCHEMA_TYPE, schemaType.name)
saveSchema(sparkSession, schema, partitionColumns, tableProperties)

if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}

if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
if (bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
Expand All @@ -353,16 +392,6 @@ object CreateDataSourceTableUtils extends Logging {
}
}

if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simply ignore them and provide a warning message.
logWarning(
s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
}

val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
CatalogTableType.EXTERNAL
Expand All @@ -375,7 +404,7 @@ object CreateDataSourceTableUtils extends Logging {
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ object DDLUtils {
isDatasourceTable(table.properties)
}

def isSchemaInferred(table: CatalogTable): Boolean = {
table.properties.get(DATASOURCE_SCHEMA_TYPE) == Option(SchemaType.INFERRED.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider contains.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use contains. It makes it much harder to read and understand
the return type is an option.

On Sunday, July 17, 2016, Jacek Laskowski [email protected] wrote:

In
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
#14207 (comment):

@@ -487,6 +487,10 @@ object DDLUtils {
isDatasourceTable(table.properties)
}

  • def isSchemaInferred(table: CatalogTable): Boolean = {
  • table.properties.get(DATASOURCE_SCHEMA_TYPE) == Option(SchemaType.INFERRED.name)

Consider contains.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/apache/spark/pull/14207/files/3be0dc0b7cfd942459c598c0d35f3d67a2c020ba#r71083304,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AATvPLJxrOTgsryjrhIAFMb3v7t5vl8-ks5qWjylgaJpZM4JMzdl
.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! @rxin @jaceklaskowski

I will not change it because using contains will break the Scala 2.10 compiler.

}

/**
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
* issue an exception [[AnalysisException]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.internal

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.annotation.Experimental
Expand All @@ -27,7 +28,8 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifie
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.datasources.CreateTableUsing
import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, DataSource, HadoopFsRelation}
import org.apache.spark.sql.types.StructType


Expand Down Expand Up @@ -350,6 +352,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty
}

/**
* Refresh the inferred schema stored in the external catalog for data source tables.
*/
private def refreshInferredSchema(tableIdent: TableIdentifier): Unit = {
val table = sessionCatalog.getTableMetadataOption(tableIdent)
table.foreach { tableDesc =>
if (DDLUtils.isDatasourceTable(tableDesc) && DDLUtils.isSchemaInferred(tableDesc)) {
val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(tableDesc)
val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(tableDesc)
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = None,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = tableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
options = tableDesc.storage.serdeProperties)
.resolveRelation().asInstanceOf[HadoopFsRelation]

val schemaProperties = new mutable.HashMap[String, String]
CreateDataSourceTableUtils.saveSchema(
sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, schemaProperties)

val tablePropertiesWithoutSchema = tableDesc.properties.filterKeys { k =>
// Keep the properties that are not for schema or partition columns
k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to know what the code's doing inside filterKeys -- consider creating a predicate function with a proper name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change it. Thanks!

!k.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PART_PREFIX) &&
k != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTCOLS &&
!k.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PARTCOL_PREFIX) }

val newTable = tableDesc.copy(properties = tablePropertiesWithoutSchema ++ schemaProperties)

// Alter the schema-related table properties that are stored in external catalog.
sessionCatalog.alterTable(newTable)
}
}
}

/**
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
* is refreshed.
Expand All @@ -359,6 +399,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
// Refresh the schema in external catalog, if it is a data source table whose schema is inferred
// at runtime. For user-specified schema, we do not infer and update the schema.
// TODO: Support column-related ALTER TABLE DDL commands, and then users can update
// the user-specified schema.
refreshInferredSchema(tableIdent)
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)

// If this table is cached as an InMemoryRelation, drop the original
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.command.SchemaType
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -703,7 +704,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
schema = schema,
schemaType = SchemaType.USER,
partitionColumns = Array.empty[String],
bucketSpec = None,
provider = "json",
Expand Down Expand Up @@ -988,7 +990,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("not_skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
schema = schema,
schemaType = SchemaType.USER,
partitionColumns = Array.empty[String],
bucketSpec = None,
provider = "parquet",
Expand All @@ -1003,7 +1006,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
schema = schema,
schemaType = SchemaType.USER,
partitionColumns = Array.empty[String],
bucketSpec = None,
provider = "parquet",
Expand Down