Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ trait CheckAnalysis extends PredicateHelper {

case InsertIntoTable(t, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why not replacing these one-letter values with something more explicit (like table in the following case) ? I understand that this is not the purpose of the PR, but just interested if it is normal to highlight those "style" things in PR discussion or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is to backport the fix from the master branch to Spark 2.0 branch. I think your comment is valid. You can submit a PR for improving the code style. Thanks!

t.isInstanceOf[Range] ||
t == OneRowRelation ||
t.isInstanceOf[LocalRelation] =>
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,33 +246,26 @@ class SessionCatalog(
}

/**
* Retrieve the metadata of an existing metastore table.
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
* Return whether a table/view with the specified name exists. If no database is specified, check
* with current database.
*/
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
externalCatalog.tableExists(db, table)
}

/**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database. If the specified table/view is not found
* in the database then a [[NoSuchTableException]] is thrown.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
val tid = TableIdentifier(table)
if (isTemporaryTable(name)) {
CatalogTable(
identifier = tid,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = tempTables(table).output.map { c =>
CatalogColumn(
name = c.name,
dataType = c.dataType.catalogString,
nullable = c.nullable
)
},
properties = Map(),
viewText = None)
} else {
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}

/**
Expand Down Expand Up @@ -368,6 +361,38 @@ class SessionCatalog(
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------

/**
* Retrieve the metadata of an existing temporary view or permanent table/view.
*
* If a database is specified in `name`, this will return the metadata of table/view in that
* database.
* If no database is specified, this will first attempt to get the metadata of a temporary view
* with the same name, then, if that does not exist, return the metadata of table/view in the
* current database.
*/
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
val table = formatTableName(name.table)
if (name.database.isDefined) {
getTableMetadata(name)
} else {
getTempView(table).map { plan =>
CatalogTable(
identifier = TableIdentifier(table),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = plan.output.map { c =>
CatalogColumn(
name = c.name,
dataType = c.dataType.catalogString,
nullable = c.nullable
)
},
properties = Map(),
viewText = None)
}.getOrElse(getTableMetadata(name))
}
}

/**
* Rename a table.
*
Expand Down Expand Up @@ -456,24 +481,6 @@ class SessionCatalog(
}
}

/**
* Return whether a table with the specified name exists.
*
* Note: If a database is explicitly specified, then this will return whether the table
* exists in that particular database instead. In that case, even if there is a temporary
* table with the same name, we will return false if the specified database does not
* contain the table.
*/
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (isTemporaryTable(name)) {
true
} else {
externalCatalog.tableExists(db, table)
}
}

/**
* Return whether a table with the specified name is a temporary table.
*
Expand Down Expand Up @@ -544,11 +551,11 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}

Expand All @@ -560,11 +567,11 @@ class SessionCatalog(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = {
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists)
}

Expand All @@ -579,12 +586,12 @@ class SessionCatalog(
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
val tableMetadata = getTableMetadata(tableName)
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}

Expand All @@ -598,11 +605,11 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
externalCatalog.alterPartitions(db, table, parts)
}

Expand All @@ -611,11 +618,11 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
externalCatalog.getPartition(db, table, spec)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,46 +423,37 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
// If database is explicitly specified, do not check temporary tables
val tempTable = Range(1, 10, 1, 10)
catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
// If database is not explicitly specified, check the current database
catalog.setCurrentDatabase("db2")
assert(catalog.tableExists(TableIdentifier("tbl1")))
assert(catalog.tableExists(TableIdentifier("tbl2")))
assert(catalog.tableExists(TableIdentifier("tbl3")))
}

test("tableExists on temporary views") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
assert(!catalog.tableExists(TableIdentifier("view1")))
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
catalog.createTempView("view1", tempTable, overrideIfExists = false)
assert(catalog.tableExists(TableIdentifier("view1")))
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
// tableExists should not check temp view.
assert(!catalog.tableExists(TableIdentifier("tbl3")))
}

test("getTableMetadata on temporary views") {
test("getTempViewOrPermanentTableMetadata on temporary views") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
val m = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1"))
intercept[NoSuchTableException] {
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
}.getMessage
assert(m.contains("Table or view 'view1' not found in database 'default'"))

val m2 = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
intercept[NoSuchTableException] {
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
}.getMessage
assert(m2.contains("Table or view 'view1' not found in database 'default'"))

catalog.createTempView("view1", tempTable, overrideIfExists = false)
assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table == "view1")
assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name == "id")
assert(catalog.getTempViewOrPermanentTableMetadata(
TableIdentifier("view1")).identifier.table == "view1")
assert(catalog.getTempViewOrPermanentTableMetadata(
TableIdentifier("view1")).schema(0).name == "id")

val m3 = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
intercept[NoSuchTableException] {
catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
}.getMessage
assert(m3.contains("Table or view 'view1' not found in database 'default'"))
}

test("list tables without pattern") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

private def saveAsTable(tableIdent: TableIdentifier): Unit = {

val sessionState = df.sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = tableIdent.copy(database = Some(db))
// Pass a table identifier with database part, so that `tableExists` won't check temp views
// unexpectedly.
val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)

(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
Expand All @@ -380,7 +375,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
mode,
extraOptions.toMap,
df.logicalPlan)
sessionState.executePlan(cmd).toRdd
df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, SimpleCatalogRelation}

Expand All @@ -38,7 +39,9 @@ case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentwithDB = TableIdentifier(tableIdent.table, Some(db))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentwithDB))

relation match {
case relation: CatalogRelation if !relation.isInstanceOf[SimpleCatalogRelation] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,11 @@ case class CreateDataSourceTableCommand(
}

val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = tableIdent.copy(database = Some(db))
// Pass a table identifier with database part, so that `tableExists` won't check temp views
// unexpectedly.
if (sessionState.catalog.tableExists(tableIdentWithDB)) {
if (sessionState.catalog.tableExists(tableIdent)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
throw new AnalysisException(s"Table ${tableIdent.unquotedString} already exists.")
}
}

Expand Down Expand Up @@ -200,8 +196,9 @@ case class CreateDataSourceTableAsSelectCommand(
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).

EliminateSubqueryAliases(
sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
// Pass a table identifier with database part, so that `tableExists` won't check temp
// views unexpectedly.
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need tableIdentWithDB in CreateDataSourceTableCommand right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. : )

case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
Expand Down
Loading