Skip to content

Commit e744f81

Browse files
committed
catalog name support + add tests
1 parent 713c0fb commit e744f81

2 files changed

Lines changed: 39 additions & 1 deletion

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ private[sql] object CatalogV2Util {
292292

293293
def getRelation(catalog: CatalogPlugin, ident: Identifier, table: Table): LogicalPlan = {
294294
SubqueryAlias(
295-
ident,
295+
Identifier.of(catalog.name +: ident.namespace, ident.name),
296296
DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
297297
}
298298

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,44 @@ class DataSourceV2SQLSuite
678678
}
679679
}
680680

681+
test("qualified column names for v2 tables") {
682+
val t = "testcat.ns1.ns2.tbl"
683+
withTable(t) {
684+
sql(s"CREATE TABLE $t (id bigint, point struct<x: bigint, y: bigint>) USING foo")
685+
sql(s"INSERT INTO $t VALUES (1, (10, 20))")
686+
687+
checkAnswer(
688+
sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $t"),
689+
Row(1, 10))
690+
checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $t"), Row(1, 10))
691+
checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $t"), Row(1, 10))
692+
checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $t"), Row(1, 10))
693+
694+
val ex = intercept[AnalysisException] {
695+
sql(s"SELECT ns1.ns2.ns3.tbl.id from $t")
696+
}
697+
assert(ex.getMessage.contains("cannot resolve '`ns1.ns2.ns3.tbl.id`"))
698+
}
699+
}
700+
701+
test("qualified column names for v1 tables") {
702+
// unset this config to use the default v2 session catalog.
703+
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
704+
705+
withTable("t") {
706+
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
707+
checkAnswer(sql("select default.t.i from spark_catalog.t"), Row(1))
708+
checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
709+
checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))
710+
711+
// catalog name cannot be used for v1 tables.
712+
val ex = intercept[AnalysisException] {
713+
sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
714+
}
715+
assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
716+
}
717+
}
718+
681719
test("InsertInto: append - across catalog") {
682720
val t1 = "testcat.ns1.ns2.tbl"
683721
val t2 = "testcat2.db.tbl"

0 commit comments

Comments
 (0)