From 50a79363dcd17c17149ee727d22aa1243f8ca4b7 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 14 Dec 2023 18:17:15 +0300 Subject: [PATCH 01/14] Assign error classes/subclasses to JdbcUtils.classifyException --- .../main/resources/error/error-classes.json | 78 +++++++++++++++++-- docs/sql-error-conditions.md | 8 ++ .../catalyst/analysis/NonEmptyException.scala | 3 - .../datasources/jdbc/JdbcUtils.scala | 7 +- .../datasources/v2/jdbc/JDBCTable.scala | 16 +++- .../v2/jdbc/JDBCTableCatalog.scala | 56 ++++++++++--- .../apache/spark/sql/jdbc/DB2Dialect.scala | 15 +++- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 33 ++++---- .../apache/spark/sql/jdbc/JdbcDialects.scala | 13 ++-- .../spark/sql/jdbc/MsSqlServerDialect.scala | 15 +++- .../apache/spark/sql/jdbc/MySQLDialect.scala | 28 ++++--- .../spark/sql/jdbc/PostgresDialect.scala | 56 ++++++------- 12 files changed, 230 insertions(+), 98 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 2aa5420eb22c..897e4975be6f 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1096,6 +1096,79 @@ ], "sqlState" : "38000" }, + "FAILED_JDBC" : { + "message" : [ + "Failed the JDBC operation:" + ], + "subClass" : { + "ALTER_TABLE" : { + "message" : [ + "Alter the table ." + ] + }, + "CREATE_INDEX" : { + "message" : [ + "Create the index in the table." + ] + }, + "CREATE_NAMESPACE" : { + "message" : [ + "Create the namespace ." + ] + }, + "CREATE_NAMESPACE_COMMENT" : { + "message" : [ + "Create a comment on the namespace: ." + ] + }, + "CREATE_TABLE" : { + "message" : [ + "Create the table ." + ] + }, + "DROP_INDEX" : { + "message" : [ + "Drop the index in the table." + ] + }, + "DROP_NAMESPACE" : { + "message" : [ + "Drop the namespace ." + ] + }, + "GET_TABLES" : { + "message" : [ + "Get tables from the namespace: ." + ] + }, + "LIST_NAMESPACES" : { + "message" : [ + "List namespaces." + ] + }, + "NAMESPACE_EXISTS" : { + "message" : [ + "Check that the namespace exists." + ] + }, + "REMOVE_NAMESPACE_COMMENT" : { + "message" : [ + "Remove a comment on the namespace: ." + ] + }, + "RENAME_TABLE" : { + "message" : [ + "Rename the table to ." + ] + }, + "TABLE_EXISTS" : { + "message" : [ + "Check that the table exists." + ] + } + }, + "sqlState" : "40000" + }, "FAILED_PARSE_STRUCT_TYPE" : { "message" : [ "Failed parsing struct: ." @@ -6778,11 +6851,6 @@ "pivot is not supported on a streaming DataFrames/Datasets" ] }, - "_LEGACY_ERROR_TEMP_3064" : { - "message" : [ - "" - ] - }, "_LEGACY_ERROR_TEMP_3065" : { "message" : [ ": " diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 82befaae81df..e8296e52f8bb 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -665,6 +665,14 @@ User defined function (``: (``) => ``) failed d Failed preparing of the function `` for call. Please, double check function's arguments. +### [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html) + +SQLSTATE: 40000 + +Failed the JDBC operation: + +For more details see [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html) + ### FAILED_PARSE_STRUCT_TYPE [SQLSTATE: 22018](sql-error-conditions-sqlstates.html#class-22-data-exception) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala index 6475ac3093fe..9955f1b7bd30 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala @@ -36,7 +36,4 @@ case class NonEmptyNamespaceException( "details" -> details)) { def this(namespace: Array[String]) = this(namespace, "", None) - - def this(details: String, cause: Option[Throwable]) = - this(Array.empty, details, cause) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 9be764e8b07d..e7835514a384 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1180,12 +1180,15 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T = { + def classifyException[T]( + errorClass: String, + messageParameters: Map[String, String], + dialect: JdbcDialect)(f: => T): T = { try { f } catch { case e: SparkThrowable with Throwable => throw e - case e: Throwable => throw dialect.classifyException(message, e) + case e: Throwable => throw dialect.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 1065d6347476..9a14f05c8b43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -58,8 +58,12 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): Unit = { JdbcUtils.withConnection(jdbcOptions) { conn => - JdbcUtils.classifyException(s"Failed to create index $indexName in ${name()}", - JdbcDialects.get(jdbcOptions.url)) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.CREATE_INDEX", + messageParameters = Map( + "indexName" -> indexName, + "tableName" -> name()), + dialect = JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.createIndex( conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) } @@ -74,8 +78,12 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt override def dropIndex(indexName: String): Unit = { JdbcUtils.withConnection(jdbcOptions) { conn => - JdbcUtils.classifyException(s"Failed to drop index $indexName in ${name()}", - JdbcDialects.get(jdbcOptions.url)) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.DROP_INDEX", + messageParameters = Map( + "indexName" -> indexName, + "tableName" -> name()), + dialect = JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 6c773d4fb1b0..47c5b198cdc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -66,7 +66,9 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => val schemaPattern = if (namespace.length == 1) namespace.head else null val rs = JdbcUtils.classifyException( - s"Failed get tables from: ${namespace.mkString(".")}", dialect) { + errorClass = "FAILED_JDBC.GET_TABLES", + messageParameters = Map("namespace" -> namespace.mkString(".")), + dialect) { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } new Iterator[Identifier] { @@ -80,7 +82,10 @@ class JDBCTableCatalog extends TableCatalog checkNamespace(ident.namespace()) val writeOptions = new JdbcOptionsInWrite( options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) - JdbcUtils.classifyException(s"Failed table existence check: $ident", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.TABLE_EXISTS", + messageParameters = Map("tableName" -> ident.toString), + dialect) { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } } @@ -100,7 +105,12 @@ class JDBCTableCatalog extends TableCatalog override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { checkNamespace(oldIdent.namespace()) JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed table renaming from $oldIdent to $newIdent", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.RENAME_TABLE", + messageParameters = Map( + "oldName" -> oldIdent.toString, + "newName" -> newIdent.toString), + dialect) { JdbcUtils.renameTable(conn, oldIdent, newIdent, options) } } @@ -160,7 +170,10 @@ class JDBCTableCatalog extends TableCatalog val writeOptions = new JdbcOptionsInWrite(tableOptions) val caseSensitive = SQLConf.get.caseSensitiveAnalysis JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed table creation: $ident", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.CREATE_TABLE", + messageParameters = Map("tableName" -> ident.toString), + dialect) { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -171,7 +184,10 @@ class JDBCTableCatalog extends TableCatalog override def alterTable(ident: Identifier, changes: TableChange*): Table = { checkNamespace(ident.namespace()) JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed table altering: $ident", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.ALTER_TABLE", + messageParameters = Map("tableName" -> ident.toString), + dialect) { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -181,7 +197,10 @@ class JDBCTableCatalog extends TableCatalog override def namespaceExists(namespace: Array[String]): Boolean = namespace match { case Array(db) => JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed namespace exists: ${namespace.mkString}", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.NAMESPACE_EXISTS", + messageParameters = Map("namespace" -> namespace.mkString(".")), + dialect) { JdbcUtils.schemaExists(conn, options, db) } } @@ -190,7 +209,10 @@ class JDBCTableCatalog extends TableCatalog override def listNamespaces(): Array[Array[String]] = { JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed list namespaces", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.LIST_NAMESPACES", + messageParameters = Map.empty, + dialect) { JdbcUtils.listSchemas(conn, options) } } @@ -238,7 +260,10 @@ class JDBCTableCatalog extends TableCatalog } } JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed create name space: $db", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.CREATE_NAMESPACE", + messageParameters = Map("namespace" -> db), + dialect) { JdbcUtils.createSchema(conn, options, db, comment) } } @@ -257,7 +282,10 @@ class JDBCTableCatalog extends TableCatalog case set: NamespaceChange.SetProperty => if (set.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed create comment on name space: $db", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.CREATE_NAMESPACE_COMMENT", + messageParameters = Map("namespace" -> db), + dialect) { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } } @@ -268,7 +296,10 @@ class JDBCTableCatalog extends TableCatalog case unset: NamespaceChange.RemoveProperty => if (unset.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed remove comment on name space: $db", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.REMOVE_NAMESPACE_COMMENT", + messageParameters = Map("namespace" -> db), + dialect) { JdbcUtils.removeSchemaComment(conn, options, db) } } @@ -290,7 +321,10 @@ class JDBCTableCatalog extends TableCatalog cascade: Boolean): Boolean = namespace match { case Array(db) if namespaceExists(namespace) => JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.DROP_NAMESPACE", + messageParameters = Map("namespace" -> db), + dialect) { JdbcUtils.dropSchema(conn, options, db, cascade) true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 8975a015ee8e..4f81ee031d22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -144,15 +144,22 @@ private object DB2Dialect extends JdbcDialect { s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate - case "42893" => throw new NonEmptyNamespaceException(message, cause = Some(e)) - case _ => super.classifyException(message, e) + case "42893" => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters) } - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index a42fe989b15c..9b8632bceee6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -28,8 +28,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.util.quoteNameParts +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex @@ -180,7 +179,10 @@ private[sql] object H2Dialect extends JdbcDialect { (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".") } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case exception: SQLException => // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html @@ -191,15 +193,16 @@ private[sql] object H2Dialect extends JdbcDialect { val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r val name = regex.findFirstMatchIn(e.getMessage).get.group(1) val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name) - throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + throw new TableAlreadyExistsException( + errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", messageParameters = Map("relationName" -> quotedName), cause = Some(e)) // TABLE_OR_VIEW_NOT_FOUND_1 case 42102 => - val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message)) + val relationName = messageParameters.getOrElse("tableName", "") throw new NoSuchTableException( errorClass = "TABLE_OR_VIEW_NOT_FOUND", - messageParameters = Map("relationName" -> quotedName), + messageParameters = Map("relationName" -> relationName), cause = Some(e)) // SCHEMA_NOT_FOUND_1 case 90079 => @@ -209,25 +212,21 @@ private[sql] object H2Dialect extends JdbcDialect { throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND", messageParameters = Map("schemaName" -> quotedName)) // INDEX_ALREADY_EXISTS_1 - case 42111 => - // The message is: Failed to create index indexName in tableName - val regex = "(?s)Failed to create index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new IndexAlreadyExistsException( indexName = indexName, tableName = tableName, cause = Some(e)) // INDEX_NOT_FOUND_1 - case 42112 => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case _ => // do nothing } case _ => // do nothing } - super.classifyException(message, e) + super.classifyException(e, errorClass, messageParameters) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 5ba4e39e8ec1..4825568d88eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -630,15 +630,16 @@ abstract class JdbcDialect extends Serializable with Logging { /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. - * @param message The error message to be placed to the returned exception. * @param e The dialect specific exception. + * @param errorClass The error class assigned in the case of an unclassified `e` + * @param messageParameters The message parameters of `errorClass` * @return `AnalysisException` or its sub-class. */ - def classifyException(message: String, e: Throwable): AnalysisException = { - new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_3064", - messageParameters = Map("msg" -> message), - cause = Some(e)) + def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { + new AnalysisException(errorClass, messageParameters, cause = Some(e)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index ee649122ca80..f63a1abdce65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -190,14 +190,21 @@ private object MsSqlServerDialect extends JdbcDialect { if (limit > 0) s"TOP ($limit)" else "" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { - case 3729 => throw new NonEmptyNamespaceException(message, cause = Some(e)) - case _ => super.classifyException(message, e) + case 3729 => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters) } - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index dd74c93bc2e1..af50a8e3e359 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -270,28 +270,26 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexMap.values.toArray } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { // ER_DUP_KEYNAME - case 1061 => - // The message is: Failed to create index indexName in tableName - val regex = "(?s)Failed to create index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) - throw new IndexAlreadyExistsException( - indexName = indexName, tableName = tableName, cause = Some(e)) - case 1091 => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") + throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e)) + case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index cff7bb5e06f0..a3c432d68f7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -225,40 +225,42 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { s"DROP INDEX ${quoteIdentifier(indexName)}" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + // Message pattern defined by postgres specification + private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already exists""".r + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.postgresql.org/docs/14/errcodes-appendix.html - case "42P07" => - // Message patterns defined at caller sides of spark - val indexRegex = "(?s)Failed to create index (.*) in (.*)".r - val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r - // Message pattern defined by postgres specification - val pgRegex = """(?:.*)relation "(.*)" already exists""".r - - message match { - case indexRegex(index, table) => - throw new IndexAlreadyExistsException( - indexName = index, tableName = table, cause = Some(e)) - case renameRegex(_, newTable) => - throw QueryCompilationErrors.tableAlreadyExistsError(newTable) - case _ if pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => - val tableName = pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1) - throw QueryCompilationErrors.tableAlreadyExistsError(tableName) - case _ => super.classifyException(message, e) - } - case "42704" => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case "42P07" if errorClass == "FAILED_JDBC.CREATE_INDEX" => + throw new IndexAlreadyExistsException( + indexName = messageParameters("indexName"), + tableName = messageParameters("tableName"), + cause = Some(e)) + case "42P07" if errorClass == "FAILED_JDBC.RENAME_TABLE" => + val newTable = messageParameters("newTable") + throw QueryCompilationErrors.tableAlreadyExistsError(newTable) + case "42P07" if pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => + val tableName = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) + .get.group(1) + throw QueryCompilationErrors.tableAlreadyExistsError(tableName) + case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case "2BP01" => throw new NonEmptyNamespaceException(message, cause = Some(e)) - case _ => super.classifyException(message, e) + case "2BP01" => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } } From 8c6158e1ed7d0fabdeea24f1288870f1d1478b02 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 14 Dec 2023 19:02:59 +0300 Subject: [PATCH 02/14] Re-gen sql-error-conditions-failed-jdbc-error-class.md --- ...rror-conditions-failed-jdbc-error-class.md | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 docs/sql-error-conditions-failed-jdbc-error-class.md diff --git a/docs/sql-error-conditions-failed-jdbc-error-class.md b/docs/sql-error-conditions-failed-jdbc-error-class.md new file mode 100644 index 000000000000..c31f51972173 --- /dev/null +++ b/docs/sql-error-conditions-failed-jdbc-error-class.md @@ -0,0 +1,80 @@ +--- +layout: global +title: FAILED_JDBC error class +displayTitle: FAILED_JDBC error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +SQLSTATE: 40000 + +Failed the JDBC operation: + +This error class has the following derived error classes: + +## ALTER_TABLE + +Alter the table ``. + +## CREATE_INDEX + +Create the index `` in the `` table. + +## CREATE_NAMESPACE + +Create the namespace ``. + +## CREATE_NAMESPACE_COMMENT + +Create a comment on the namespace: ``. + +## CREATE_TABLE + +Create the table ``. + +## DROP_INDEX + +Drop the index `` in the `` table. + +## DROP_NAMESPACE + +Drop the namespace ``. + +## GET_TABLES + +Get tables from the namespace: ``. + +## LIST_NAMESPACES + +List namespaces. + +## NAMESPACE_EXISTS + +Check that the namespace `` exists. + +## REMOVE_NAMESPACE_COMMENT + +Remove a comment on the namespace: ``. + +## RENAME_TABLE + +Rename the table `` to ``. + +## TABLE_EXISTS + +Check that the table `` exists. + + From d7302f2586e3b08f484cee737b96e12b5252acee Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 14 Dec 2023 19:17:35 +0300 Subject: [PATCH 03/14] Fix PostgresDialect --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index a3c432d68f7e..3c0f325b1e59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -242,7 +242,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { tableName = messageParameters("tableName"), cause = Some(e)) case "42P07" if errorClass == "FAILED_JDBC.RENAME_TABLE" => - val newTable = messageParameters("newTable") + val newTable = messageParameters("newName") throw QueryCompilationErrors.tableAlreadyExistsError(newTable) case "42P07" if pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => val tableName = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) From d3204c7669a3c2312fc9d143ed42680c0c179a77 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 14 Dec 2023 19:21:47 +0300 Subject: [PATCH 04/14] Exclude classifyException from Mima --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 463212290877..cfda74509720 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -100,6 +100,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.CacheId$"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.apply"), + // SPARK-46410: Assign error classes/subclasses to JdbcUtils.classifyException + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcDialect.classifyException"), + (problem: Problem) => problem match { case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") && !cls.fullName.startsWith("org.sparkproject.dmg.pmml") From f9072aedcd2ab75277da84f6e65731473cb3fa6f Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 14 Dec 2023 20:22:28 +0300 Subject: [PATCH 05/14] Port some tests to checkError --- .../v2/jdbc/JDBCTableCatalogSuite.scala | 37 +++++++++++++++---- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index eed64b873c45..5fc1bc8e76e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -267,10 +267,20 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val expectedSchema = new StructType().add("C2", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) // Drop not existing column - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName DROP COLUMN bad_column") - }.getMessage - assert(msg.contains("Missing field bad_column in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName DROP COLUMN bad_column" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "bad_column", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- C2: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 51)) } // Drop a column to not existing table and namespace Seq( @@ -297,10 +307,21 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { .add("deptno", DoubleType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update not existing column - val msg1 = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE") - }.getMessage - assert(msg1.contains("Missing field bad_column in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "bad_column", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- ID: double (nullable = true) + | |-- deptno: double (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 64)) // Update column to wrong type checkError( exception = intercept[ParseException] { From 901c511551b17e6267bf63cfbad1929a061f1c5e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 14 Dec 2023 20:29:11 +0300 Subject: [PATCH 06/14] Fix intergration tests --- .../scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index b5f5b0e5f20b..ea1be7ab9b78 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -221,10 +221,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu test("CREATE TABLE with table property") { withTable(s"$catalogName.new_table") { - val m = intercept[AnalysisException] { + val e = intercept[AnalysisException] { sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") - }.message - assert(m.contains("Failed table creation")) + } + assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE") testCreateTableWithProperty(s"$catalogName.new_table") } } From 9b0090fb6ee7b13d9700478ce1b3f78f764fcc38 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 14 Dec 2023 23:29:23 +0300 Subject: [PATCH 07/14] Use checkError --- .../v2/jdbc/JDBCTableCatalogSuite.scala | 147 +++++++++++++----- 1 file changed, 108 insertions(+), 39 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 5fc1bc8e76e1..0f6358312285 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -356,10 +356,21 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { .add("deptno", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update nullability of not existing column - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL") - }.getMessage - assert(msg.contains("Missing field bad_column in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "bad_column", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- ID: integer (nullable = true) + | |-- deptno: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 66)) } // Update column nullability in not existing table and namespace Seq( @@ -378,17 +389,29 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val tableName = "h2.test.alt_table" withTable(tableName) { sql(s"CREATE TABLE $tableName (ID INTEGER)") - val exp = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN ID COMMENT 'test'") - } - assert(exp.getErrorClass === "_LEGACY_ERROR_TEMP_1305") - assert("Unsupported TableChange (.*) in JDBC catalog\\.".r.pattern.matcher(exp.getMessage) - .matches()) + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $tableName ALTER COLUMN ID COMMENT 'test'") + }, + errorClass = "_LEGACY_ERROR_TEMP_1305", + parameters = Map("change" -> + "org.apache.spark.sql.connector.catalog.TableChange\\$UpdateColumnComment.*"), + matchPVals = true) // Update comment for not existing column - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'") - }.getMessage - assert(msg.contains("Missing field bad_column in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "bad_column", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- ID: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 67)) } // Update column comments in not existing table and namespace Seq( @@ -414,10 +437,21 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { assert(t.schema === expectedSchema) withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3") - }.getMessage - assert(msg.contains("Missing field C2 in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "C2", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- c1: integer (nullable = true) + | |-- c2: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 51)) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -430,10 +464,21 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName DROP COLUMN C3") - }.getMessage - assert(msg.contains("Missing field C3 in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName DROP COLUMN C3" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "C3", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- c1: integer (nullable = true) + | |-- c3: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, sqlText.length - 1)) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -444,10 +489,20 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE") - }.getMessage - assert(msg.contains("Missing field C1 in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "C1", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- c1: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, sqlText.length - 1)) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -458,10 +513,20 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL") - }.getMessage - assert(msg.contains("Missing field C1 in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "C1", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- c1: double (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, sqlText.length - 1)) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -489,11 +554,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("CREATE TABLE with table property") { withTable("h2.test.new_table") { - val m = intercept[AnalysisException] { - sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + - " TBLPROPERTIES('ENGINE'='tableEngineName')") - }.cause.get.getMessage - assert(m.contains("\"TABLEENGINENAME\" not found")) + checkError( + exception = intercept[AnalysisException] { + sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + + " TBLPROPERTIES('ENGINE'='tableEngineName')") + }, + errorClass = "FAILED_JDBC.CREATE_TABLE", + parameters = Map("tableName" -> "test.new_table")) } } @@ -505,10 +572,12 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length") { - val e = intercept[AnalysisException]{ - sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") - } - assert(e.getCause.getMessage.contains("1000000001")) + checkError( + exception = intercept[AnalysisException]{ + sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") + }, + errorClass = "FAILED_JDBC.CREATE_TABLE", + parameters = Map("tableName" -> "test.new_table")) } test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") { From be744307e7c89b9596530dec20199a25546295e1 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 15 Dec 2023 11:48:51 +0300 Subject: [PATCH 08/14] Quoting --- .../datasources/v2/jdbc/JDBCTable.scala | 15 ++++++--- .../v2/jdbc/JDBCTableCatalog.scala | 33 +++++++++++-------- .../v2/jdbc/JDBCTableCatalogSuite.scala | 4 +-- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 9a14f05c8b43..9b0b9b5d3686 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -26,13 +26,18 @@ import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.errors.DataTypeErrorsBase import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JdbcUtils} import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions) - extends Table with SupportsRead with SupportsWrite with SupportsIndex { + extends Table + with SupportsRead + with SupportsWrite + with SupportsIndex + with DataTypeErrorsBase { override def name(): String = ident.toString @@ -61,8 +66,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_INDEX", messageParameters = Map( - "indexName" -> indexName, - "tableName" -> name()), + "indexName" -> toSQLId(indexName), + "tableName" -> toSQLId(name)), dialect = JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.createIndex( conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) @@ -81,8 +86,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt JdbcUtils.classifyException( errorClass = "FAILED_JDBC.DROP_INDEX", messageParameters = Map( - "indexName" -> indexName, - "tableName" -> name()), + "indexName" -> toSQLId(indexName), + "tableName" -> toSQLId(name)), dialect = JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 47c5b198cdc2..30baacd669bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JDBCRDD, JdbcUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} @@ -35,7 +35,10 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class JDBCTableCatalog extends TableCatalog - with SupportsNamespaces with FunctionCatalog with Logging { + with SupportsNamespaces + with FunctionCatalog + with DataTypeErrorsBase + with Logging { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ private var catalogName: String = null @@ -67,7 +70,7 @@ class JDBCTableCatalog extends TableCatalog val schemaPattern = if (namespace.length == 1) namespace.head else null val rs = JdbcUtils.classifyException( errorClass = "FAILED_JDBC.GET_TABLES", - messageParameters = Map("namespace" -> namespace.mkString(".")), + messageParameters = Map("namespace" -> toSQLId(namespace.toSeq)), dialect) { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } @@ -84,7 +87,7 @@ class JDBCTableCatalog extends TableCatalog options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) JdbcUtils.classifyException( errorClass = "FAILED_JDBC.TABLE_EXISTS", - messageParameters = Map("tableName" -> ident.toString), + messageParameters = Map("tableName" -> toSQLId(ident)), dialect) { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } @@ -108,8 +111,8 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.classifyException( errorClass = "FAILED_JDBC.RENAME_TABLE", messageParameters = Map( - "oldName" -> oldIdent.toString, - "newName" -> newIdent.toString), + "oldName" -> toSQLId(oldIdent), + "newName" -> toSQLId(newIdent)), dialect) { JdbcUtils.renameTable(conn, oldIdent, newIdent, options) } @@ -172,7 +175,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_TABLE", - messageParameters = Map("tableName" -> ident.toString), + messageParameters = Map("tableName" -> toSQLId(ident)), dialect) { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } @@ -186,7 +189,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.ALTER_TABLE", - messageParameters = Map("tableName" -> ident.toString), + messageParameters = Map("tableName" -> toSQLId(ident)), dialect) { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } @@ -199,7 +202,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.NAMESPACE_EXISTS", - messageParameters = Map("namespace" -> namespace.mkString(".")), + messageParameters = Map("namespace" -> toSQLId(namespace.toSeq)), dialect) { JdbcUtils.schemaExists(conn, options, db) } @@ -262,7 +265,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_NAMESPACE", - messageParameters = Map("namespace" -> db), + messageParameters = Map("namespace" -> toSQLId(db)), dialect) { JdbcUtils.createSchema(conn, options, db, comment) } @@ -284,7 +287,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_NAMESPACE_COMMENT", - messageParameters = Map("namespace" -> db), + messageParameters = Map("namespace" -> toSQLId(db)), dialect) { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } @@ -298,7 +301,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.REMOVE_NAMESPACE_COMMENT", - messageParameters = Map("namespace" -> db), + messageParameters = Map("namespace" -> toSQLId(db)), dialect) { JdbcUtils.removeSchemaComment(conn, options, db) } @@ -323,7 +326,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.DROP_NAMESPACE", - messageParameters = Map("namespace" -> db), + messageParameters = Map("namespace" -> toSQLId(db)), dialect) { JdbcUtils.dropSchema(conn, options, db, cascade) true @@ -364,4 +367,8 @@ class JDBCTableCatalog extends TableCatalog throw new NoSuchFunctionException(ident) } } + + private def toSQLId(ident: Identifier): String = { + toSQLId(ident.namespace.toSeq :+ ident.name) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 0f6358312285..73392de24184 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -560,7 +560,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { " TBLPROPERTIES('ENGINE'='tableEngineName')") }, errorClass = "FAILED_JDBC.CREATE_TABLE", - parameters = Map("tableName" -> "test.new_table")) + parameters = Map("tableName" -> "`test`.`new_table`")) } } @@ -577,7 +577,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") }, errorClass = "FAILED_JDBC.CREATE_TABLE", - parameters = Map("tableName" -> "test.new_table")) + parameters = Map("tableName" -> "`test`.`new_table`")) } test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") { From c9a23f51235a7477c898f5bd1ee2145e10368680 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 15 Dec 2023 12:42:45 +0300 Subject: [PATCH 09/14] Fix integration tests --- .../test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index ea1be7ab9b78..76277dbc96b6 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"CREATE index i1 ON $catalogName.new_table (col1)") }, errorClass = "INDEX_ALREADY_EXISTS", - parameters = Map("indexName" -> "i1", "tableName" -> "new_table") + parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") ) sql(s"DROP index i1 ON $catalogName.new_table") @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"DROP index i1 ON $catalogName.new_table") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "i1", "tableName" -> "new_table") + parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") ) } } From 707c2129ece4a3638ab80736fd50535bc7c39444 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 15 Dec 2023 15:47:37 +0300 Subject: [PATCH 10/14] Fix test --- .../test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index a81501127a48..7b275247ee3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2860,8 +2860,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel }, errorClass = "INDEX_ALREADY_EXISTS", parameters = Map( - "indexName" -> "people_index", - "tableName" -> "test.people" + "indexName" -> "`people_index`", + "tableName" -> "`test`.`people`" ) ) assert(jdbcTable.indexExists("people_index")) @@ -2877,7 +2877,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel sql(s"DROP INDEX people_index ON TABLE h2.test.people") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "people_index", "tableName" -> "test.people") + parameters = Map("indexName" -> "`people_index`", "tableName" -> "`test`.`people`") ) assert(jdbcTable.indexExists("people_index") == false) val indexes3 = jdbcTable.listIndexes() From d21fb1ee469272cc450cdb919116d0b076c17175 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 15 Dec 2023 15:47:51 +0300 Subject: [PATCH 11/14] Address a comment --- .../spark/sql/jdbc/PostgresDialect.scala | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 3c0f325b1e59..9cbeb080b0b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -236,22 +236,27 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { case sqlException: SQLException => sqlException.getSQLState match { // https://www.postgresql.org/docs/14/errcodes-appendix.html - case "42P07" if errorClass == "FAILED_JDBC.CREATE_INDEX" => - throw new IndexAlreadyExistsException( - indexName = messageParameters("indexName"), - tableName = messageParameters("tableName"), - cause = Some(e)) - case "42P07" if errorClass == "FAILED_JDBC.RENAME_TABLE" => - val newTable = messageParameters("newName") - throw QueryCompilationErrors.tableAlreadyExistsError(newTable) - case "42P07" if pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => - val tableName = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) - .get.group(1) - throw QueryCompilationErrors.tableAlreadyExistsError(tableName) - case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") - throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) + case "42P07" => + if (errorClass == "FAILED_JDBC.CREATE_INDEX") { + throw new IndexAlreadyExistsException( + indexName = messageParameters("indexName"), + tableName = messageParameters("tableName"), + cause = Some(e)) + } else if (errorClass == "FAILED_JDBC.DROP_INDEX") { + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") + throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) + } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") { + val newTable = messageParameters("newName") + throw QueryCompilationErrors.tableAlreadyExistsError(newTable) + } else { + val tblRegexp = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) + if (tblRegexp.nonEmpty) { + throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) + } else { + super.classifyException(e, errorClass, messageParameters) + } + } case "2BP01" => throw NonEmptyNamespaceException( namespace = messageParameters.get("namespace").toArray, From 2e8eeb0d33ac97a2d4cc76786125638dbf5f9ac2 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 15 Dec 2023 17:05:51 +0300 Subject: [PATCH 12/14] Fix integration tests 2 --- .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 9cbeb080b0b0..4637a96039b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -242,10 +242,6 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { indexName = messageParameters("indexName"), tableName = messageParameters("tableName"), cause = Some(e)) - } else if (errorClass == "FAILED_JDBC.DROP_INDEX") { - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") - throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") { val newTable = messageParameters("newName") throw QueryCompilationErrors.tableAlreadyExistsError(newTable) @@ -257,6 +253,10 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { super.classifyException(e, errorClass, messageParameters) } } + case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") + throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case "2BP01" => throw NonEmptyNamespaceException( namespace = messageParameters.get("namespace").toArray, From 01a1a9c52e6091981028666525bed0be9a0d8fa4 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 16 Dec 2023 09:38:34 +0300 Subject: [PATCH 13/14] Add JDBC url --- .../main/resources/error/error-classes.json | 4 +- .../datasources/v2/jdbc/JDBCTable.scala | 2 + .../v2/jdbc/JDBCTableCatalog.scala | 39 ++++++++++++++----- .../v2/jdbc/JDBCTableCatalogSuite.scala | 8 +++- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 897e4975be6f..058d76e4ecdb 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1098,7 +1098,7 @@ }, "FAILED_JDBC" : { "message" : [ - "Failed the JDBC operation:" + "Failed JDBC on the operation:" ], "subClass" : { "ALTER_TABLE" : { @@ -1167,7 +1167,7 @@ ] } }, - "sqlState" : "40000" + "sqlState" : "HV000" }, "FAILED_PARSE_STRUCT_TYPE" : { "message" : [ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 9b0b9b5d3686..c251010881f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -66,6 +66,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_INDEX", messageParameters = Map( + "url" -> jdbcOptions.url, "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), dialect = JdbcDialects.get(jdbcOptions.url)) { @@ -86,6 +87,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt JdbcUtils.classifyException( errorClass = "FAILED_JDBC.DROP_INDEX", messageParameters = Map( + "url" -> jdbcOptions.url, "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), dialect = JdbcDialects.get(jdbcOptions.url)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 30baacd669bf..976cd3f6e9aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -70,7 +70,9 @@ class JDBCTableCatalog extends TableCatalog val schemaPattern = if (namespace.length == 1) namespace.head else null val rs = JdbcUtils.classifyException( errorClass = "FAILED_JDBC.GET_TABLES", - messageParameters = Map("namespace" -> toSQLId(namespace.toSeq)), + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(namespace.toSeq)), dialect) { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } @@ -87,7 +89,9 @@ class JDBCTableCatalog extends TableCatalog options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) JdbcUtils.classifyException( errorClass = "FAILED_JDBC.TABLE_EXISTS", - messageParameters = Map("tableName" -> toSQLId(ident)), + messageParameters = Map( + "url" -> options.url, + "tableName" -> toSQLId(ident)), dialect) { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } @@ -111,6 +115,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.classifyException( errorClass = "FAILED_JDBC.RENAME_TABLE", messageParameters = Map( + "url" -> options.url, "oldName" -> toSQLId(oldIdent), "newName" -> toSQLId(newIdent)), dialect) { @@ -175,7 +180,9 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_TABLE", - messageParameters = Map("tableName" -> toSQLId(ident)), + messageParameters = Map( + "url" -> options.url, + "tableName" -> toSQLId(ident)), dialect) { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } @@ -189,7 +196,9 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.ALTER_TABLE", - messageParameters = Map("tableName" -> toSQLId(ident)), + messageParameters = Map( + "url" -> options.url, + "tableName" -> toSQLId(ident)), dialect) { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } @@ -202,7 +211,9 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.NAMESPACE_EXISTS", - messageParameters = Map("namespace" -> toSQLId(namespace.toSeq)), + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(namespace.toSeq)), dialect) { JdbcUtils.schemaExists(conn, options, db) } @@ -214,7 +225,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.LIST_NAMESPACES", - messageParameters = Map.empty, + messageParameters = Map("url" -> options.url), dialect) { JdbcUtils.listSchemas(conn, options) } @@ -265,7 +276,9 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_NAMESPACE", - messageParameters = Map("namespace" -> toSQLId(db)), + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(db)), dialect) { JdbcUtils.createSchema(conn, options, db, comment) } @@ -287,7 +300,9 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_NAMESPACE_COMMENT", - messageParameters = Map("namespace" -> toSQLId(db)), + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(db)), dialect) { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } @@ -301,7 +316,9 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.REMOVE_NAMESPACE_COMMENT", - messageParameters = Map("namespace" -> toSQLId(db)), + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(db)), dialect) { JdbcUtils.removeSchemaComment(conn, options, db) } @@ -326,7 +343,9 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.DROP_NAMESPACE", - messageParameters = Map("namespace" -> toSQLId(db)), + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(db)), dialect) { JdbcUtils.dropSchema(conn, options, db, cascade) true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 73392de24184..1cd4077b4ec1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -560,7 +560,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { " TBLPROPERTIES('ENGINE'='tableEngineName')") }, errorClass = "FAILED_JDBC.CREATE_TABLE", - parameters = Map("tableName" -> "`test`.`new_table`")) + parameters = Map( + "url" -> url, + "tableName" -> "`test`.`new_table`")) } } @@ -577,7 +579,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") }, errorClass = "FAILED_JDBC.CREATE_TABLE", - parameters = Map("tableName" -> "`test`.`new_table`")) + parameters = Map( + "url" -> url, + "tableName" -> "`test`.`new_table`")) } test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") { From 0338fbcf2274bb21def1d394f27cbf547a05a979 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 16 Dec 2023 09:42:01 +0300 Subject: [PATCH 14/14] Re-gen *.md --- docs/sql-error-conditions-failed-jdbc-error-class.md | 4 ++-- docs/sql-error-conditions.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/sql-error-conditions-failed-jdbc-error-class.md b/docs/sql-error-conditions-failed-jdbc-error-class.md index c31f51972173..575441e3f347 100644 --- a/docs/sql-error-conditions-failed-jdbc-error-class.md +++ b/docs/sql-error-conditions-failed-jdbc-error-class.md @@ -19,9 +19,9 @@ license: | limitations under the License. --- -SQLSTATE: 40000 +SQLSTATE: HV000 -Failed the JDBC operation: +Failed JDBC `` on the operation: This error class has the following derived error classes: diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index e8296e52f8bb..5657877971c5 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -667,9 +667,9 @@ Failed preparing of the function `` for call. Please, double check fun ### [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html) -SQLSTATE: 40000 +SQLSTATE: HV000 -Failed the JDBC operation: +Failed JDBC `` on the operation: For more details see [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html)