diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index d8e19c994c59..8c679c4d57fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.command._ @@ -66,7 +66,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS") case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _) - if isSessionCatalog(catalog) => + if supportsV1Command(catalog) => if (a.column.name.length > 1) { throw QueryCompilationErrors.unsupportedTableOperationError( catalog, ident, "ALTER COLUMN with qualified column") @@ -117,13 +117,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) => AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true) - case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command => + case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command => DescribeDatabaseCommand(db, extended, output) - case SetNamespaceProperties(DatabaseInSessionCatalog(db), properties) if conf.useV1Command => + case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command => AlterDatabasePropertiesCommand(db, properties) - case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command => + case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command => if (StringUtils.isEmpty(location)) { throw QueryExecutionErrors.invalidEmptyLocationError(location) } @@ -218,7 +218,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) => DropTempViewCommand(ident) - case DropView(ResolvedV1Identifier(ident), ifExists) => + case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) => DropTableCommand(ident, ifExists, isView = true, purge = false) case DropView(r @ ResolvedIdentifier(catalog, ident), _) => @@ -237,14 +237,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties) - case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) if conf.useV1Command => + case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command => DropDatabaseCommand(db, d.ifExists, d.cascade) - case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command => + case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command => ShowTablesCommand(Some(db), pattern, output) case ShowTableExtended( - DatabaseInSessionCatalog(db), + ResolvedV1Database(db), pattern, partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))), output) => @@ -265,7 +265,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AnalyzePartitionCommand(ident, partitionSpec, noScan) } - case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) => + case AnalyzeTables(ResolvedV1Database(db), noScan) => AnalyzeTablesCommand(Some(db), noScan) case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => @@ -293,7 +293,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) if conf.useV1Command => ShowCreateTableCommand(ident, output) case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output) - if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) => + if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) => ShowCreateTableCommand(table.catalogTable.identifier, output) case TruncateTable(ResolvedV1TableIdentifier(ident)) => @@ -367,7 +367,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) => AlterViewAsCommand(ident, originalText, query) - case CreateView(ResolvedV1Identifier(ident), userSpecifiedColumns, comment, + case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment, properties, originalText, child, allowExisting, replace) => CreateViewCommand( name = ident, @@ -385,7 +385,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case ShowViews(ns: ResolvedNamespace, pattern, output) => ns match { - case DatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output) + case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output) case _ => throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views") } @@ -408,7 +408,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions") } - case ShowFunctions(DatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => + case ShowFunctions( + ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) => ShowFunctionsCommand(db, pattern, userScope, systemScope, output) case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) => @@ -429,7 +430,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION") } - case CreateFunction(ResolvedV1Identifier(ident), className, resources, ifExists, replace) => + case CreateFunction( + ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) => CreateFunctionCommand( FunctionIdentifier(ident.table, ident.database, ident.catalog), className, @@ -564,7 +566,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) object ResolvedV1TableIdentifier { def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { - case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) => + case ResolvedTable(catalog, _, t: V1Table, _) if supportsV1Command(catalog) => Some(t.catalogTable.identifier) case _ => None } @@ -579,6 +581,18 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } object ResolvedV1Identifier { + def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { + case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) => + if (ident.namespace().length != 1) { + throw QueryCompilationErrors.requiresSinglePartNamespaceError(ident.namespace()) + } + Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name))) + case _ => None + } + } + + // Use this object to help match commands that do not have a v2 implementation. + object ResolvedIdentifierInSessionCatalog{ def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match { case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) => if (ident.namespace().length != 1) { @@ -610,7 +624,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } } - private object DatabaseInSessionCatalog { + private object ResolvedV1Database { + def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { + case ResolvedNamespace(catalog, _) if !supportsV1Command(catalog) => None + case ResolvedNamespace(_, Seq()) => + throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError() + case ResolvedNamespace(_, Seq(dbName)) => Some(dbName) + case _ => + assert(resolved.namespace.length > 1) + throw QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError( + resolved.namespace.map(quoteIfNeeded).mkString(".")) + } + } + + // Use this object to help match commands that do not have a v2 implementation. + private object ResolvedDatabaseInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None case ResolvedNamespace(_, Seq()) => @@ -625,11 +653,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private object DatabaseNameInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { - case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None + case ResolvedNamespace(catalog, _) if !supportsV1Command(catalog) => None case ResolvedNamespace(_, Seq(dbName)) => Some(dbName) case _ => assert(resolved.namespace.length > 1) throw QueryCompilationErrors.invalidDatabaseNameError(resolved.namespace.quoted) } } + + private def supportsV1Command(catalog: CatalogPlugin): Boolean = { + catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) && + !SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 0106a9c5aea0..5fd4aa970a62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -69,6 +69,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } } + private def hadoopConf = session.sessionState.newHadoopConf() + private def refreshCache(r: DataSourceV2Relation)(): Unit = { session.sharedState.cacheManager.recacheByPlan(session, r) } @@ -103,7 +105,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_))) + tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath( + CatalogUtils.stringToURI(loc), hadoopConf).toString)) } override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala index 79fbabbeacaa..219c8e198fa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1) val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1)) - assert(tableInfo.properties().get("location") === "file:/abc") + assert(tableInfo.properties().get("location") === "file:///abc") assert(tableInfo.properties().get("provider") === v2Format) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 01d878f2d2b4..77e447062d40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -441,8 +441,8 @@ class DataSourceV2SQLSuiteV1Filter "AS SELECT id FROM source") val location = spark.sql(s"DESCRIBE EXTENDED $identifier") .filter("col_name = 'Location'") - .select("data_type").head.getString(0) - assert(location === "file:/tmp/foo") + .select("data_type").head().getString(0) + assert(location === "file:///tmp/foo") } } } @@ -458,8 +458,8 @@ class DataSourceV2SQLSuiteV1Filter "AS SELECT id FROM source") val location = spark.sql(s"DESCRIBE EXTENDED $identifier") .filter("col_name = 'Location'") - .select("data_type").head.getString(0) - assert(location === "file:/tmp/foo") + .select("data_type").head().getString(0) + assert(location === "file:///tmp/foo") } } } @@ -2068,15 +2068,10 @@ class DataSourceV2SQLSuiteV1Filter } test("REPLACE TABLE: v1 table") { - val e = intercept[AnalysisException] { - sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") - } - checkError( - exception = e, - errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", - sqlState = "0A000", - parameters = Map("tableName" -> "`spark_catalog`.`default`.`tbl`", - "operation" -> "REPLACE TABLE")) + sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") + val v2Catalog = catalog("spark_catalog").asTableCatalog + val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl")) + assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName) } test("DeleteFrom: - delete with invalid predicate") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 46586c622db7..9042231bdc59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -79,18 +79,22 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY - val propsWithLocation = if (properties.containsKey(key)) { + val newProps = new java.util.HashMap[String, String]() + newProps.putAll(properties) + if (properties.containsKey(TableCatalog.PROP_LOCATION)) { + newProps.put(TableCatalog.PROP_EXTERNAL, "true") + } + + val propsWithLocation = if (newProps.containsKey(key)) { // Always set a location so that CREATE EXTERNAL TABLE won't fail with LOCATION not specified. - if (!properties.containsKey(TableCatalog.PROP_LOCATION)) { - val newProps = new java.util.HashMap[String, String]() - newProps.putAll(properties) + if (!newProps.containsKey(TableCatalog.PROP_LOCATION)) { newProps.put(TableCatalog.PROP_LOCATION, "file:/abc") newProps } else { - properties + newProps } } else { - properties + newProps } val created = super.createTable(ident, schema, partitions, propsWithLocation) val t = newTable(created.name(), schema, partitions, propsWithLocation) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala index adda9dcfffe4..fec33d811b46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command "'via' = '2')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", - "LOCATION 'file:/tmp'", + "LOCATION 'file:///tmp'", "TBLPROPERTIES (", "'password' = '*********(redacted)',", "'prop1' = '1',", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index c6bf220e45d5..c70675497064 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -754,7 +754,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(table.properties().get("comment").equals(description)) assert(table.properties().get("path").equals(dir.getAbsolutePath)) assert(table.properties().get("external").equals("true")) - assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath)) + assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath)) } }