Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.jdbc.v2

import java.sql.Connection
import java.sql.{Connection, SQLFeatureNotSupportedException}

import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.NamespaceChange
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.tags.DockerTest
Expand Down Expand Up @@ -55,11 +57,47 @@ class MySQLNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespac

override def dataPreparation(conn: Connection): Unit = {}

override def builtinNamespaces: Array[Array[String]] = Array()
override def builtinNamespaces: Array[Array[String]] =
Array(Array("information_schema"), Array("mysql"), Array("performance_schema"), Array("sys"))

override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
Array(builtinNamespaces.head, namespace) ++ builtinNamespaces.tail
}

override val supportsSchemaComment: Boolean = false

// Cannot get namespaces with conn.getMetaData.getSchemas
// TODO testListNamespaces()
// TODO testDropNamespaces()
override val supportsDropSchemaRestrict: Boolean = false

testListNamespaces()
testDropNamespaces()

test("Create or remove comment of namespace unsupported") {
val e1 = intercept[AnalysisException] {
catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava)
}
assert(e1.getMessage.contains("Failed create name space: foo"))
assert(e1.getCause.isInstanceOf[SQLFeatureNotSupportedException])
assert(e1.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage
.contains("Create namespace comment is not supported"))
assert(catalog.namespaceExists(Array("foo")) === false)
catalog.createNamespace(Array("foo"), Map.empty[String, String].asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
val e2 = intercept[AnalysisException] {
catalog.alterNamespace(Array("foo"), NamespaceChange
.setProperty("comment", "comment for foo"))
}
assert(e2.getMessage.contains("Failed create comment on name space: foo"))
assert(e2.getCause.isInstanceOf[SQLFeatureNotSupportedException])
assert(e2.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage
.contains("Create namespace comment is not supported"))
val e3 = intercept[AnalysisException] {
catalog.alterNamespace(Array("foo"), NamespaceChange.removeProperty("comment"))
}
assert(e3.getMessage.contains("Failed remove comment on name space: foo"))
assert(e3.getCause.isInstanceOf[SQLFeatureNotSupportedException])
assert(e3.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage
.contains("Remove namespace comment is not supported"))
catalog.dropNamespace(Array("foo"), cascade = true)
assert(catalog.namespaceExists(Array("foo")) === false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte

def supportsDropSchemaCascade: Boolean = true

def supportsDropSchemaRestrict: Boolean = true

def testListNamespaces(): Unit = {
test("listNamespaces: basic behavior") {
val commentMap = if (supportsSchemaComment) {
Expand All @@ -78,7 +80,11 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
assert(createCommentWarning === false)
}

catalog.dropNamespace(Array("foo"), cascade = false)
if (supportsDropSchemaRestrict) {
catalog.dropNamespace(Array("foo"), cascade = false)
} else {
catalog.dropNamespace(Array("foo"), cascade = true)
}
assert(catalog.namespaceExists(Array("foo")) === false)
assert(catalog.listNamespaces() === builtinNamespaces)
val msg = intercept[AnalysisException] {
Expand All @@ -99,15 +105,21 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
}
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.dropNamespace(Array("foo"), cascade = false)
if (supportsDropSchemaRestrict) {
catalog.dropNamespace(Array("foo"), cascade = false)
} else {
catalog.dropNamespace(Array("foo"), cascade = true)
}
assert(catalog.namespaceExists(Array("foo")) === false)

// Drop non empty namespace without cascade
catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava)
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.createTable(ident1, schema, Array.empty, emptyProps)
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
if (supportsDropSchemaRestrict) {
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
}
}

// Drop non empty namespace with cascade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1944,4 +1944,16 @@ object QueryExecutionErrors {
def MultipleBucketTransformsError(): Throwable = {
new UnsupportedOperationException("Multiple bucket transforms are not supported.")
}

def unsupportedCreateNamespaceCommentError(): Throwable = {
new SQLFeatureNotSupportedException("Create namespace comment is not supported")
}

def unsupportedRemoveNamespaceCommentError(): Throwable = {
new SQLFeatureNotSupportedException("Remove namespace comment is not supported")
}

def unsupportedDropNamespaceRestrictError(): Throwable = {
new SQLFeatureNotSupportedException("Drop namespace restrict is not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -979,8 +979,23 @@ object JdbcUtils extends Logging with SQLConfHelper {
namespace: String,
comment: String): Unit = {
val dialect = JdbcDialects.get(options.url)
val schemaCommentQuery = if (comment.isEmpty) {
comment
} else {
dialect.getSchemaCommentQuery(namespace, comment)
}
executeStatement(conn, options, s"CREATE SCHEMA ${dialect.quoteIdentifier(namespace)}")
if (!comment.isEmpty) createNamespaceComment(conn, options, namespace, comment)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the actual change here? Get rid of the try-catch in createNamespaceComment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want keep the atomic of create namespace with comment.

if (comment.nonEmpty) executeStatement(conn, options, schemaCommentQuery)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess

    val dialect = JdbcDialects.get(options.url)
    executeStatement(conn, options, s"CREATE SCHEMA ${dialect.quoteIdentifier(namespace)}")
    if (comment.nonEmpty) {
      executeStatement(conn, options, dialect.getSchemaCommentQuery(namespace, comment))
    }

Copy link
Contributor Author

@beliefer beliefer Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I forgot it.
I want keep the atomic of create namespace with comment.
If create namespace with comment in MySQL, we should fails all the operations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to @huaxingao 's suggestion, which is a pure code-style comment and not related to the business logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want keep the atomic of create namespace with comment.

}

def namespaceExists(conn: Connection, options: JDBCOptions, namespace: String): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are touching this file, can we rename namespace to schema for all these methods in this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val dialect = JdbcDialects.get(options.url)
dialect.namespacesExists(conn, options, namespace)
}

def listNamespaces(conn: Connection, options: JDBCOptions): Array[Array[String]] = {
val dialect = JdbcDialects.get(options.url)
dialect.listNamespaces(conn, options)
}

def createNamespaceComment(
Expand All @@ -989,26 +1004,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
namespace: String,
comment: String): Unit = {
val dialect = JdbcDialects.get(options.url)
try {
executeStatement(
conn, options, dialect.getSchemaCommentQuery(namespace, comment))
} catch {
case e: Exception =>
logWarning("Cannot create JDBC catalog comment. The catalog comment will be ignored.")
}
executeStatement(conn, options, dialect.getSchemaCommentQuery(namespace, comment))
}

def removeNamespaceComment(
conn: Connection,
options: JDBCOptions,
namespace: String): Unit = {
val dialect = JdbcDialects.get(options.url)
try {
executeStatement(conn, options, dialect.removeSchemaCommentQuery(namespace))
} catch {
case e: Exception =>
logWarning("Cannot drop JDBC catalog comment.")
}
executeStatement(conn, options, dialect.removeSchemaCommentQuery(namespace))
}

/**
Expand Down Expand Up @@ -1148,11 +1152,17 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}

def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = {
def executeQuery(conn: Connection, options: JDBCOptions, sql: String)(
f: ResultSet => Unit): Unit = {
val statement = conn.createStatement
try {
statement.setQueryTimeout(options.queryTimeout)
statement.executeQuery(sql)
val rs = statement.executeQuery(sql)
try {
f(rs)
} finally {
rs.close()
}
} finally {
statement.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuilder

import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange}
Expand Down Expand Up @@ -173,23 +172,14 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
case Array(db) =>
JdbcUtils.withConnection(options) { conn =>
val rs = conn.getMetaData.getSchemas(null, db)
while (rs.next()) {
if (rs.getString(1) == db) return true;
}
false
JdbcUtils.namespaceExists(conn, options, db)
}
case _ => false
}

override def listNamespaces(): Array[Array[String]] = {
JdbcUtils.withConnection(options) { conn =>
val schemaBuilder = ArrayBuilder.make[Array[String]]
val rs = conn.getMetaData.getSchemas()
while (rs.next()) {
schemaBuilder += Array(rs.getString(1))
}
schemaBuilder.result
JdbcUtils.listNamespaces(conn, options)
}
}

Expand Down Expand Up @@ -254,7 +244,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
case set: NamespaceChange.SetProperty =>
if (set.property() == SupportsNamespaces.PROP_COMMENT) {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.createNamespaceComment(conn, options, db, set.value)
JdbcUtils.classifyException(s"Failed create comment on name space: $db", dialect) {
JdbcUtils.createNamespaceComment(conn, options, db, set.value)
}
}
} else {
throw QueryCompilationErrors.cannotSetJDBCNamespaceWithPropertyError(set.property)
Expand All @@ -263,7 +255,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
case unset: NamespaceChange.RemoveProperty =>
if (unset.property() == SupportsNamespaces.PROP_COMMENT) {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.removeNamespaceComment(conn, options, db)
JdbcUtils.classifyException(s"Failed remove comment on name space: $db", dialect) {
JdbcUtils.removeNamespaceComment(conn, options, db)
}
}
} else {
throw QueryCompilationErrors.cannotUnsetJDBCNamespaceWithPropertyError(unset.property)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,29 @@ abstract class JdbcDialect extends Serializable with Logging{
}
}

/**
* Check namespace exists or not.
*/
def namespacesExists(conn: Connection, options: JDBCOptions, namespace: String): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the JDBC context, we should say schema instead of namespace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val rs = conn.getMetaData.getSchemas(null, namespace)
while (rs.next()) {
if (rs.getString(1) == namespace) return true;
}
false
}

/**
* Lists all the schemas in this table.
*/
def listNamespaces(conn: Connection, options: JDBCOptions): Array[Array[String]] = {
val schemaBuilder = ArrayBuilder.make[Array[String]]
val rs = conn.getMetaData.getSchemas()
while (rs.next()) {
schemaBuilder += Array(rs.getString(1))
}
schemaBuilder.result
}

/**
* Return Some[true] iff `TRUNCATE TABLE` causes cascading default.
* Some[true] : TRUNCATE TABLE causes cascading.
Expand Down
Loading