Skip to content

Commit 53db6f0

Browse files
committed
CatalogAndIdentifier shouldn't return wrong namespace
1 parent c198620 commit 53db6f0

9 files changed

Lines changed: 169 additions & 74 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,6 +1344,10 @@ class SessionCatalog(
13441344
!hiveFunctions.contains(name.funcName.toLowerCase(Locale.ROOT))
13451345
}
13461346

1347+
def isTempFunction(name: String): Boolean = {
1348+
isTemporaryFunction(FunctionIdentifier(name))
1349+
}
1350+
13471351
/**
13481352
* Return whether this function has been registered in the function registry of the current
13491353
* session. If not existed, return false.

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ private[sql] trait LookupCatalog extends Logging {
9494
* Extract catalog and identifier from a multi-part name with the current catalog if needed.
9595
* Catalog name takes precedence over identifier, but for a single-part name, identifier takes
9696
* precedence over catalog name.
97+
*
98+
* Note that, this pattern is used to look up catalog objects like table, function, permanent
99+
* view, etc. If you need to look up temp views, please do it separately before calling this
100+
* pattern.
97101
*/
98102
object CatalogAndIdentifier {
99103
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
@@ -103,16 +107,7 @@ private[sql] trait LookupCatalog extends Logging {
103107
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
104108
assert(nameParts.nonEmpty)
105109
if (nameParts.length == 1) {
106-
// If the current catalog is session catalog, the current namespace is not used because
107-
// the single-part name could be referencing a temp view, which doesn't belong to any
108-
// namespaces. An empty namespace will be resolved inside the session catalog
109-
// implementation when a relation is looked up.
110-
val ns = if (CatalogV2Util.isSessionCatalog(currentCatalog)) {
111-
Array.empty[String]
112-
} else {
113-
catalogManager.currentNamespace
114-
}
115-
Some((currentCatalog, Identifier.of(ns, nameParts.head)))
110+
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
116111
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
117112
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
118113
// API does not support view yet, and we have to use v1 commands to deal with global temp

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
5050
})
5151
when(manager.currentCatalog).thenReturn(sessionCatalog)
5252
when(manager.v2SessionCatalog).thenReturn(sessionCatalog)
53+
when(manager.currentNamespace).thenReturn(Array.empty[String])
5354
manager
5455
}
5556

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 120 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2222
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
2323
import org.apache.spark.sql.catalyst.plans.logical._
2424
import org.apache.spark.sql.catalyst.rules.Rule
25-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
25+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
2626
import org.apache.spark.sql.connector.expressions.Transform
2727
import org.apache.spark.sql.execution.command._
2828
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
@@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBui
3939
class ResolveSessionCatalog(
4040
val catalogManager: CatalogManager,
4141
conf: SQLConf,
42-
isView: Seq[String] => Boolean)
42+
isTempView: Seq[String] => Boolean,
43+
isTempFunction: String => Boolean)
4344
extends Rule[LogicalPlan] with LookupCatalog {
4445
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
4546
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
@@ -215,8 +216,9 @@ class ResolveSessionCatalog(
215216
}
216217
AlterDatabaseSetLocationCommand(ns.head, location)
217218

218-
case RenameTableStatement(SessionCatalogAndTable(_, oldName), newNameParts, isView) =>
219-
AlterTableRenameCommand(oldName.asTableIdentifier, newNameParts.asTableIdentifier, isView)
219+
// v1 RENAME TABLE supports temp view.
220+
case RenameTableStatement(TempViewOrV1Table(oldName), newName, isView) =>
221+
AlterTableRenameCommand(oldName.asTableIdentifier, newName.asTableIdentifier, isView)
220222

221223
case DescribeRelation(ResolvedTable(_, ident, _: V1Table), partitionSpec, isExtended) =>
222224
DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended)
@@ -228,10 +230,12 @@ class ResolveSessionCatalog(
228230
case DescribeColumnStatement(
229231
SessionCatalogAndTable(catalog, tbl), colNameParts, isExtended) =>
230232
loadTable(catalog, tbl.asIdentifier).collect {
233+
// `V1Table` also includes permanent views.
231234
case v1Table: V1Table =>
232235
DescribeColumnCommand(tbl.asTableIdentifier, colNameParts, isExtended)
233236
}.getOrElse {
234-
if (isView(tbl)) {
237+
if (isTempView(tbl)) {
238+
// v1 DESCRIBE COLUMN supports temp view.
235239
DescribeColumnCommand(tbl.asTableIdentifier, colNameParts, isExtended)
236240
} else {
237241
throw new AnalysisException("Describing columns is not supported for v2 tables.")
@@ -279,8 +283,9 @@ class ResolveSessionCatalog(
279283
ignoreIfExists = c.ifNotExists)
280284
}
281285

282-
case RefreshTableStatement(SessionCatalogAndTable(_, tbl)) =>
283-
RefreshTable(tbl.asTableIdentifier)
286+
// v1 REFRESH TABLE supports temp view.
287+
case RefreshTableStatement(TempViewOrV1Table(name)) =>
288+
RefreshTable(name.asTableIdentifier)
284289

285290
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
286291
// session catalog and the table provider is not v2.
@@ -315,11 +320,13 @@ class ResolveSessionCatalog(
315320
orCreate = c.orCreate)
316321
}
317322

318-
case DropTableStatement(SessionCatalogAndTable(catalog, tbl), ifExists, purge) =>
319-
DropTableCommand(tbl.asTableIdentifier, ifExists, isView = false, purge = purge)
323+
// v1 DROP TABLE supports temp view.
324+
case DropTableStatement(TempViewOrV1Table(name), ifExists, purge) =>
325+
DropTableCommand(name.asTableIdentifier, ifExists, isView = false, purge = purge)
320326

321-
case DropViewStatement(SessionCatalogAndTable(catalog, viewName), ifExists) =>
322-
DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false)
327+
// v1 DROP TABLE supports temp view.
328+
case DropViewStatement(TempViewOrV1Table(name), ifExists) =>
329+
DropTableCommand(name.asTableIdentifier, ifExists, isView = true, purge = false)
323330

324331
case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
325332
if isSessionCatalog(catalog) =>
@@ -393,12 +400,18 @@ class ResolveSessionCatalog(
393400
ShowCreateTableAsSerdeCommand(v1TableName.asTableIdentifier)
394401

395402
case CacheTableStatement(tbl, plan, isLazy, options) =>
396-
val v1TableName = parseV1Table(tbl, "CACHE TABLE")
397-
CacheTableCommand(v1TableName.asTableIdentifier, plan, isLazy, options)
403+
val name = if (plan.isDefined) {
404+
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
405+
// Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
406+
tbl
407+
} else {
408+
parseTempViewOrV1Table(tbl, "CACHE TABLE")
409+
}
410+
CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)
398411

399412
case UncacheTableStatement(tbl, ifExists) =>
400-
val v1TableName = parseV1Table(tbl, "UNCACHE TABLE")
401-
UncacheTableCommand(v1TableName.asTableIdentifier, ifExists)
413+
val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE")
414+
UncacheTableCommand(name.asTableIdentifier, ifExists)
402415

403416
case TruncateTableStatement(tbl, partitionSpec) =>
404417
val v1TableName = parseV1Table(tbl, "TRUNCATE TABLE")
@@ -427,10 +440,6 @@ class ResolveSessionCatalog(
427440
throw new AnalysisException(
428441
s"Namespace name should have only one part if specified: ${ns.get.quoted}")
429442
}
430-
if (tbl.length > 2) {
431-
throw new AnalysisException(
432-
s"Table name should have at most two parts: ${tbl.quoted}")
433-
}
434443
ShowColumnsCommand(db, v1TableName)
435444

436445
case AlterTableRecoverPartitionsStatement(tbl) =>
@@ -470,18 +479,23 @@ class ResolveSessionCatalog(
470479
serdeProperties,
471480
partitionSpec)
472481

473-
case AlterViewAsStatement(tbl, originalText, query) =>
474-
val v1TableName = parseV1Table(tbl, "ALTER VIEW QUERY")
482+
case AlterViewAsStatement(name, originalText, query) =>
483+
val viewName = parseTempViewOrV1Table(name, "ALTER VIEW QUERY")
475484
AlterViewAsCommand(
476-
v1TableName.asTableIdentifier,
485+
viewName.asTableIdentifier,
477486
originalText,
478487
query)
479488

480489
case CreateViewStatement(
481490
tbl, userSpecifiedColumns, comment, properties,
482491
originalText, child, allowExisting, replace, viewType) =>
483492

484-
val v1TableName = parseV1Table(tbl, "CREATE VIEW")
493+
val v1TableName = if (viewType != PersistedView) {
494+
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
495+
tbl
496+
} else {
497+
parseV1Table(tbl, "CREATE VIEW")
498+
}
485499
CreateViewCommand(
486500
v1TableName.asTableIdentifier,
487501
userSpecifiedColumns,
@@ -496,56 +510,90 @@ class ResolveSessionCatalog(
496510
case ShowTableProperties(r: ResolvedTable, propertyKey) if isSessionCatalog(r.catalog) =>
497511
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
498512

499-
case DescribeFunctionStatement(CatalogAndIdentifier(catalog, ident), extended) =>
513+
case DescribeFunctionStatement(nameParts, extended) =>
500514
val functionIdent =
501-
parseSessionCatalogFunctionIdentifier("DESCRIBE FUNCTION", catalog, ident)
515+
parseSessionCatalogFunctionIdentifier(nameParts, "DESCRIBE FUNCTION")
502516
DescribeFunctionCommand(functionIdent, extended)
503517

504518
case ShowFunctionsStatement(userScope, systemScope, pattern, fun) =>
505519
val (database, function) = fun match {
506-
case Some(CatalogAndIdentifier(catalog, ident)) =>
520+
case Some(nameParts) =>
507521
val FunctionIdentifier(fn, db) =
508-
parseSessionCatalogFunctionIdentifier("SHOW FUNCTIONS", catalog, ident)
522+
parseSessionCatalogFunctionIdentifier(nameParts, "SHOW FUNCTIONS")
509523
(db, Some(fn))
510524
case None => (None, pattern)
511525
}
512526
ShowFunctionsCommand(database, function, userScope, systemScope)
513527

514-
case DropFunctionStatement(CatalogAndIdentifier(catalog, ident), ifExists, isTemp) =>
528+
case DropFunctionStatement(nameParts, ifExists, isTemp) =>
515529
val FunctionIdentifier(function, database) =
516-
parseSessionCatalogFunctionIdentifier("DROP FUNCTION", catalog, ident)
530+
parseSessionCatalogFunctionIdentifier(nameParts, "DROP FUNCTION")
517531
DropFunctionCommand(database, function, ifExists, isTemp)
518532

519-
case CreateFunctionStatement(CatalogAndIdentifier(catalog, ident),
533+
case CreateFunctionStatement(nameParts,
520534
className, resources, isTemp, ignoreIfExists, replace) =>
521-
val FunctionIdentifier(function, database) =
522-
parseSessionCatalogFunctionIdentifier("CREATE FUNCTION", catalog, ident)
523-
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
524-
replace)
535+
if (isTemp) {
536+
// temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name.
537+
val database = if (nameParts.length > 2) {
538+
throw new AnalysisException(s"${nameParts.quoted} is not a valid function name.")
539+
} else if (nameParts.length == 2) {
540+
Some(nameParts.head)
541+
} else {
542+
None
543+
}
544+
CreateFunctionCommand(
545+
database,
546+
nameParts.last,
547+
className,
548+
resources,
549+
isTemp,
550+
ignoreIfExists,
551+
replace)
552+
} else {
553+
val FunctionIdentifier(function, database) =
554+
parseSessionCatalogFunctionIdentifier(nameParts, "CREATE FUNCTION")
555+
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
556+
replace)
557+
}
525558
}
526559

560+
// TODO: move function related v2 statements to the new framework.
527561
private def parseSessionCatalogFunctionIdentifier(
528-
sql: String,
529-
catalog: CatalogPlugin,
530-
functionIdent: Identifier): FunctionIdentifier = {
531-
if (isSessionCatalog(catalog)) {
532-
functionIdent.asMultipartIdentifier match {
562+
nameParts: Seq[String],
563+
sql: String): FunctionIdentifier = {
564+
if (nameParts.length == 1 && isTempFunction(nameParts.head)) {
565+
return FunctionIdentifier(nameParts.head)
566+
}
567+
568+
nameParts match {
569+
case SessionCatalogAndTable(_, funcName) => funcName match {
533570
case Seq(db, fn) => FunctionIdentifier(fn, Some(db))
534-
case Seq(fn) => FunctionIdentifier(fn, None)
571+
case Seq(fn) =>
572+
val database = if (nameParts.head == CatalogManager.SESSION_CATALOG_NAME) {
573+
// For name parts like `spark_catalog.t`, we need to fill in the default database so
574+
// that the caller side won't treat it as a temp function.
575+
Some(catalogManager.v1SessionCatalog.getCurrentDatabase)
576+
} else {
577+
None
578+
}
579+
FunctionIdentifier(fn, database)
535580
case _ =>
536-
throw new AnalysisException(s"Unsupported function name '${functionIdent.quoted}'")
581+
throw new AnalysisException(s"Unsupported function name '${funcName.quoted}'")
537582
}
538-
} else {
539-
throw new AnalysisException(s"$sql is only supported in v1 catalog")
583+
584+
case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog")
540585
}
541586
}
542587

543-
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
544-
val CatalogAndIdentifier(catalog, ident) = tableName
545-
if (!isSessionCatalog(catalog)) {
546-
throw new AnalysisException(s"$sql is only supported with v1 tables.")
547-
}
548-
ident.asMultipartIdentifier
588+
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match {
589+
case SessionCatalogAndTable(_, tbl) => tbl
590+
case _ => throw new AnalysisException(s"$sql is only supported with v1 tables.")
591+
}
592+
593+
private def parseTempViewOrV1Table(
594+
nameParts: Seq[String], sql: String): Seq[String] = nameParts match {
595+
case TempViewOrV1Table(name) => name
596+
case _ => throw new AnalysisException(s"$sql is only supported with temp views or v1 tables.")
549597
}
550598

551599
private def buildCatalogTable(
@@ -584,7 +632,29 @@ class ResolveSessionCatalog(
584632
object SessionCatalogAndTable {
585633
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
586634
case SessionCatalogAndIdentifier(catalog, ident) =>
587-
Some(catalog -> ident.asMultipartIdentifier)
635+
if (nameParts.length == 1) {
636+
// If there is only one name part, it means the current catalog is the session catalog.
637+
// Here we return the original name part, to keep the error message unchanged for
638+
// v1 commands.
639+
Some(catalog -> nameParts)
640+
} else {
641+
Some(catalog -> ident.asMultipartIdentifier)
642+
}
643+
case _ => None
644+
}
645+
}
646+
647+
object TempViewOrV1Table {
648+
def unapply(nameParts: Seq[String]): Option[Seq[String]] = nameParts match {
649+
case _ if isTempView(nameParts) => Some(nameParts)
650+
case SessionCatalogAndTable(_, tbl) =>
651+
if (nameParts.head == CatalogManager.SESSION_CATALOG_NAME && tbl.length == 1) {
652+
// For name parts like `spark_catalog.t`, we need to fill in the default database so
653+
// that the caller side won't treat it as a temp view.
654+
Some(Seq(catalogManager.v1SessionCatalog.getCurrentDatabase, tbl.head))
655+
} else {
656+
Some(tbl)
657+
}
588658
case _ => None
589659
}
590660
}

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ abstract class BaseSessionStateBuilder(
174174
new FindDataSourceTable(session) +:
175175
new ResolveSQLOnFile(session) +:
176176
new FallBackFileSourceV2(session) +:
177-
new ResolveSessionCatalog(catalogManager, conf, catalog.isView) +:
177+
new ResolveSessionCatalog(
178+
catalogManager, conf, catalog.isTempView, catalog.isTempFunction) +:
178179
customResolutionRules
179180

180181
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =

sql/core/src/test/resources/sql-tests/results/describe.sql.out

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ struct<plan:string>
520520
-- !query output
521521
== Physical Plan ==
522522
Execute DescribeTableCommand
523-
+- DescribeTableCommand `t`, false
523+
+- DescribeTableCommand `default`.`t`, false
524524

525525

526526
-- !query
@@ -530,7 +530,7 @@ struct<plan:string>
530530
-- !query output
531531
== Physical Plan ==
532532
Execute DescribeTableCommand
533-
+- DescribeTableCommand `t`, true
533+
+- DescribeTableCommand `default`.`t`, true
534534

535535

536536
-- !query
@@ -544,14 +544,14 @@ struct<plan:string>
544544

545545
== Analyzed Logical Plan ==
546546
col_name: string, data_type: string, comment: string
547-
DescribeTableCommand `t`, false
547+
DescribeTableCommand `default`.`t`, false
548548

549549
== Optimized Logical Plan ==
550-
DescribeTableCommand `t`, false
550+
DescribeTableCommand `default`.`t`, false
551551

552552
== Physical Plan ==
553553
Execute DescribeTableCommand
554-
+- DescribeTableCommand `t`, false
554+
+- DescribeTableCommand `default`.`t`, false
555555

556556

557557
-- !query
@@ -571,7 +571,7 @@ struct<plan:string>
571571
-- !query output
572572
== Physical Plan ==
573573
Execute DescribeTableCommand
574-
+- DescribeTableCommand `t`, Map(c -> Us, d -> 2), false
574+
+- DescribeTableCommand `default`.`t`, Map(c -> Us, d -> 2), false
575575

576576

577577
-- !query

0 commit comments

Comments
 (0)