From 9beee56e25c0464c4b2d89be1c60cfa418e9a886 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 26 Sep 2024 08:51:44 +0800 Subject: [PATCH 1/2] Make DelegatingCatalogExtension more extendable --- .../sql/connector/catalog/DelegatingCatalogExtension.java | 6 ++++-- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 4 ++-- .../org/apache/spark/sql/internal/DataFrameWriterImpl.scala | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index f6686d2e4d3b..5ccc4d4eb2bf 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -38,7 +38,7 @@ @Evolving public abstract class DelegatingCatalogExtension implements CatalogExtension { - private CatalogPlugin delegate; + protected CatalogPlugin delegate; @Override public final void setDelegateCatalog(CatalogPlugin delegate) { @@ -51,7 +51,9 @@ public String name() { } @Override - public final void initialize(String name, CaseInsensitiveStringMap options) {} + public void initialize(String name, CaseInsensitiveStringMap options) { + delegate.initialize(name, options); + } @Override public Set capabilities() { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 02ad2e79a564..a9ad7523c8fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogExtension, CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command._ @@ -706,6 +706,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private def supportsV1Command(catalog: CatalogPlugin): Boolean = { isSessionCatalog(catalog) && ( SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty || - catalog.isInstanceOf[DelegatingCatalogExtension]) + catalog.isInstanceOf[CatalogExtension]) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala index f0eef9ae1cbb..8164d33f46fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala @@ -429,7 +429,7 @@ final class DataFrameWriterImpl[T] private[sql](ds: Dataset[T]) extends DataFram val canUseV2 = lookupV2Provider().isDefined || (df.sparkSession.sessionState.conf.getConf( SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined && !df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME) - .isInstanceOf[DelegatingCatalogExtension]) + .isInstanceOf[CatalogExtension]) session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) => From 685c78c7bd79e9c1331f1b1b99334ec0eb5a0b63 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 26 Sep 2024 11:53:42 +0800 Subject: [PATCH 2/2] Update sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java --- .../sql/connector/catalog/DelegatingCatalogExtension.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index 5ccc4d4eb2bf..786821514822 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -51,9 +51,7 @@ public String name() { } @Override - public void initialize(String name, CaseInsensitiveStringMap options) { - delegate.initialize(name, options); - } + public void initialize(String name, CaseInsensitiveStringMap options) {} @Override public Set capabilities() {