Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ class SessionCatalog(
externalCatalog.alterTable(newTableDefinition)
}

/**
* Return whether a table/view with the specified name exists. If no database is specified, check
* with current database.
*/
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
externalCatalog.tableExists(db, table)
}

/**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database. If the specified table/view is not found
Expand All @@ -270,24 +280,6 @@ class SessionCatalog(
externalCatalog.getTableOption(db, table)
}

/**
* Retrieve the metadata of an existing temporary view or permanent table/view.
* If the temporary view does not exist, tries to get the metadata an existing permanent
* table/view. If no database is specified, assume the table/view is in the current database.
* If the specified table/view is not found in the database then a [[NoSuchTableException]] is
* thrown.
*/
def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized {
val table = formatTableName(name)
getTempView(table).map { plan =>
CatalogTable(
identifier = TableIdentifier(table),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(getTableMetadata(TableIdentifier(name)))
}

/**
* Load files stored in given path into an existing metastore table.
* If no database is specified, assume the table is in the current database.
Expand Down Expand Up @@ -368,6 +360,30 @@ class SessionCatalog(
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------

/**
* Retrieve the metadata of an existing temporary view or permanent table/view.
*
* If a database is specified in `name`, this will return the metadata of table/view in that
* database.
* If no database is specified, this will first attempt to get the metadata of a temporary view
* with the same name, then, if that does not exist, return the metadata of table/view in the
* current database.
*/
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
val table = formatTableName(name.table)
if (name.database.isDefined) {
getTableMetadata(name)
} else {
getTempView(table).map { plan =>
CatalogTable(
identifier = TableIdentifier(table),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(getTableMetadata(name))
}
}

/**
* Rename a table.
*
Expand Down Expand Up @@ -449,24 +465,6 @@ class SessionCatalog(
}
}

/**
* Return whether a table/view with the specified name exists.
*
* Note: If a database is explicitly specified, then this will return whether the table/view
* exists in that particular database instead. In that case, even if there is a temporary
* table with the same name, we will return false if the specified database does not
* contain the table/view.
*/
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (isTemporaryTable(name)) {
true
} else {
externalCatalog.tableExists(db, table)
}
}

/**
* Return whether a table with the specified name is a temporary table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,22 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
intercept[NoSuchTableException] {
catalog.getTempViewOrPermanentTableMetadata("view1")
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
}.getMessage

intercept[NoSuchTableException] {
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
}.getMessage

catalog.createTempView("view1", tempTable, overrideIfExists = false)
assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier ==
TableIdentifier("view1"), "the temporary view `view1` should exist")
assert(catalog.getTempViewOrPermanentTableMetadata(
TableIdentifier("view1")).identifier.table == "view1")
assert(catalog.getTempViewOrPermanentTableMetadata(
TableIdentifier("view1")).schema(0).name == "id")

intercept[NoSuchTableException] {
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
}.getMessage
}

test("list tables without pattern") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
}

val sessionState = df.sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = tableIdent.copy(database = Some(db))
// Pass a table identifier with database part, so that `tableExists` won't check temp views
// unexpectedly.
val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)

(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
Expand All @@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
bucketSpec = getBucketSpec
)
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
sessionState.executePlan(cmd).toRdd
df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
assert(table.provider.isDefined)

val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
// Pass a table identifier with database part, so that `tableExists` won't check temp views
// unexpectedly.
if (sessionState.catalog.tableExists(tableIdentWithDB)) {
if (sessionState.catalog.tableExists(table.identifier)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test failure is caused by this change. After this PR, table.identifier does not always have the database name.

}
}

Expand Down Expand Up @@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand(

var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
// Pass a table identifier with database part, so that `tableExists` won't check temp views
// unexpectedly.
if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
// Check if we need to throw an exception or just return.
mode match {
Expand All @@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).

EliminateSubqueryAliases(
sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
// Pass a table identifier with database part, so that `lookupRelation` won't get temp
// views unexpectedly.
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,32 +183,25 @@ case class DropTableCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (!catalog.tableExists(tableName)) {
if (!ifExists) {
val objectName = if (isView) "View" else "Table"
throw new AnalysisException(s"$objectName to drop '$tableName' does not exist")
}
} else {
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
// issue an exception.
catalog.getTableMetadataOption(tableName).map(_.tableType match {
case CatalogTableType.VIEW if !isView =>
throw new AnalysisException(
"Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
case o if o != CatalogTableType.VIEW && isView =>
throw new AnalysisException(
s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
case _ =>
})
try {
sparkSession.sharedState.cacheManager.uncacheQuery(
sparkSession.table(tableName.quotedString))
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
catalog.refreshTable(tableName)
catalog.dropTable(tableName, ifExists, purge)
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
// issue an exception.
catalog.getTableMetadataOption(tableName).map(_.tableType match {
case CatalogTableType.VIEW if !isView =>
throw new AnalysisException(
"Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
case o if o != CatalogTableType.VIEW && isView =>
throw new AnalysisException(
s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
case _ =>
})
try {
sparkSession.sharedState.cacheManager.uncacheQuery(
sparkSession.table(tableName.quotedString))
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
catalog.refreshTable(tableName)
catalog.dropTable(tableName, ifExists, purge)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,7 @@ case class CreateTableLikeCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (!catalog.tableExists(sourceTable)) {
throw new AnalysisException(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
}

val sourceTableDesc = if (sourceTable.database.isDefined) {
catalog.getTableMetadata(sourceTable)
} else {
catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
}
val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)

// Storage format
val newStorage =
Expand Down Expand Up @@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableComman

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = if (tableName.database.isDefined) {
catalog.getTableMetadata(tableName)
} else {
catalog.getTempViewOrPermanentTableMetadata(tableName.table)
}
val table = catalog.getTempViewOrPermanentTableMetadata(tableName)
table.schema.map { c =>
Row(c.name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}

private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
val tableMetadata = if (tableIdentifier.database.isDefined) {
sessionCatalog.getTableMetadata(tableIdentifier)
} else {
sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
}
val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)

val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,8 @@ class HiveDDLSuite
.createTempView(sourceViewName)
sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")

val sourceTable =
spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName)
val sourceTable = spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(
TableIdentifier(sourceViewName))
val targetTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(targetTabName, Some("default")))

Expand Down