-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37929][SQL] Support cascade mode for dropNamespace API
#35246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
0cd7ab1
ff685ed
e72d0c7
56addfb
f0a7f31
fe3a3de
005d0ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |||||
| import org.apache.spark.annotation.Evolving; | ||||||
| import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; | ||||||
| import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; | ||||||
| import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; | ||||||
|
|
||||||
| import java.util.Map; | ||||||
|
|
||||||
|
|
@@ -136,15 +137,21 @@ void alterNamespace( | |||||
| NamespaceChange... changes) throws NoSuchNamespaceException; | ||||||
|
|
||||||
| /** | ||||||
| * Drop a namespace from the catalog, recursively dropping all objects within the namespace. | ||||||
| * Drop a namespace from the catalog with cascade mode, recursively dropping all objects | ||||||
| * within the namespace if cascade is true. | ||||||
| * <p> | ||||||
| * If the catalog implementation does not support this operation, it may throw | ||||||
| * {@link UnsupportedOperationException}. | ||||||
| * | ||||||
| * @param namespace a multi-part namespace | ||||||
| * @param cascade a boolean flag that deletes all namespaces and tables under the namespace | ||||||
| * if it is set true | ||||||
| * @return true if the namespace was dropped | ||||||
| * @throws NoSuchNamespaceException If the namespace does not exist (optional) | ||||||
| * @throws NonEmptyNamespaceException If the namespace is non-empty | ||||||
|
||||||
| * @throws NonEmptyNamespaceException If the namespace is non-empty | |
| * @throws NonEmptyNamespaceException If the namespace is non-empty and cascade is false. |
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait .. this API was added in 3.0.0 at SPARK-27661. Shouldn't we have overwritten version to keep the binary compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we should keep the old API dropNamespace(String[] namespace), and add a new API dropNamespace( String[] namespace, boolean cascade)? @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, otherwise the downstream datasources complied against Spark 3.2 would not work with Spark 3.3 with an exception like method does not exist. Mima plugin we use usually detects such binary compatibility but seems it didn't for some reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I create a PR for reversing this API dropNamespace and adding a new one? WDYT? @cloud-fan @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We changed the implementation of DropNamespaceExec to not call listTables at the beginning, and the catalog implementation must deal with the cascade parameter properly. It's better to break binary compatibility here, instead of silently calling the old dropNamespace implementation which may drop all the tables in the namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the binary compatibility but change the old behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's possible, unless we add versioning to the DS v2 APIs, so that we know which version the catalog implementation is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we implement default method of dropNamespace(String[] namespace) that calls dropNamespace(namespace, cascade = false) or dropNamespace(namespace, cascade = true) with deprecating and pointing the alternative? It already used to work with cascading before, and that's what we documented. This API was added from Spark 3.0.0, and I don't think we should break this binary compatibility unless we must.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we think that we shouldn't make this implemented, we should throw an exception with keeping the signature so we can show nicer error message instead of method not found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we implement default method of dropNamespace(String[] namespace)
This doesn't help, because the new dropNamespace(String[] namespace, boolean cascade) is not implemented and will break all the existing implementations.
A common way to keep backward compatibility is to add default implementation for the new API. e.g.
boolean dropNamespace(String[] namespace, boolean cascade) {
if (cascade) dropNamespace(namespace) else throw ...
}
However, DROP DATABASE is much more commonly used than DROP DATABASE ... CASCADE. So this doesn't help either.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ | ||
|
|
||
|
|
||
| /** | ||
| * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception | ||
| * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. | ||
| */ | ||
| case class NonEmptyNamespaceException( | ||
| override val message: String, | ||
| override val cause: Option[Throwable] = None) | ||
| extends AnalysisException(message, cause = cause) { | ||
|
|
||
| def this(namespace: Array[String]) = { | ||
| this(s"Namespace '${namespace.quoted}' is non empty.") | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap | |
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} | ||
| import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} | ||
| import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} | ||
| import org.apache.spark.sql.connector.expressions.{SortOrder, Transform} | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
@@ -213,10 +213,16 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp | |
| namespaces.put(namespace.toList, CatalogV2Util.applyNamespaceChanges(metadata, changes)) | ||
| } | ||
|
|
||
| override def dropNamespace(namespace: Array[String]): Boolean = { | ||
| listNamespaces(namespace).foreach(dropNamespace) | ||
| override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = { | ||
| listNamespaces(namespace).foreach(namespace => dropNamespace(namespace, cascade)) | ||
|
||
| try { | ||
| listTables(namespace).foreach(dropTable) | ||
| if (!cascade) { | ||
| if (listTables(namespace).nonEmpty || listNamespaces(namespace).nonEmpty) { | ||
| throw new NonEmptyNamespaceException(namespace) | ||
| } | ||
| } else { | ||
| listTables(namespace).foreach(dropTable) | ||
| } | ||
| } catch { | ||
| case _: NoSuchNamespaceException => | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -278,7 +278,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging | |
| } | ||
| } | ||
|
|
||
| override def dropNamespace(namespace: Array[String]): Boolean = namespace match { | ||
| override def dropNamespace( | ||
| namespace: Array[String], | ||
| cascade: Boolean): Boolean = namespace match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @beliefer do you know how JDBC support DROP DATABASE CASCADE?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you create a followup PR to fix JDBC source? We need to respect the cascade parameter. thanks!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan Thank you for the ping. I will try to fix it. |
||
| case Array(db) if namespaceExists(namespace) => | ||
| if (listTables(Array(db)).nonEmpty) { | ||
| throw QueryExecutionErrors.namespaceNotEmptyError(namespace) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.