From 8caa8e7735fb49ae8e97ce6e779498113e4d450e Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 6 Jul 2020 14:25:26 +0200 Subject: [PATCH 01/13] [SPARK-32001][SQL]Create JDBC authentication provider developer API --- .../security/SecurityConfigurationLock.scala | 28 +++++++ .../sql/jdbc/DB2KrbIntegrationSuite.scala | 2 +- ...ces.jdbc.connection.JdbcConnectionProvider | 6 ++ .../datasources/jdbc/JDBCOptions.scala | 3 + .../datasources/jdbc/JdbcUtils.scala | 2 +- .../connection/BasicConnectionProvider.scala | 19 ++++- .../jdbc/connection/ConnectionProvider.scala | 77 +++++++------------ .../connection/DB2ConnectionProvider.scala | 28 +++---- .../connection/JdbcConnectionProvider.scala | 54 +++++++++++++ .../connection/MSSQLConnectionProvider.scala | 32 ++++---- .../MariaDBConnectionProvider.scala | 21 +++-- .../connection/OracleConnectionProvider.scala | 28 +++---- .../PostgresConnectionProvider.scala | 20 ++--- .../connection/SecureConnectionProvider.scala | 40 ++++++---- ...ces.jdbc.connection.JdbcConnectionProvider | 1 + .../connection/ConnectionProviderSuite.scala | 39 +++++++--- .../ConnectionProviderSuiteBase.scala | 17 ++-- .../DB2ConnectionProviderSuite.scala | 6 +- ...ntentionallyFaultyConnectionProvider.scala | 33 ++++++++ .../MSSQLConnectionProviderSuite.scala | 42 +++++----- .../MariaDBConnectionProviderSuite.scala | 6 +- .../OracleConnectionProviderSuite.scala | 7 +- .../PostgresConnectionProviderSuite.scala | 18 +++-- 23 files changed, 337 insertions(+), 192 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala create mode 100644 sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala create mode 100644 sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala diff --git a/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala b/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala new file mode 100644 index 0000000000000..17c0d58a52e30 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.security + +import org.apache.spark.annotation.DeveloperApi + +/** + * ::DeveloperApi:: + * There are cases when global JVM security configuration must be modified. + * In order to avoid race the modification must be synchronized with this. + */ +@DeveloperApi +object SecurityConfigurationLock diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala index fc88985cf2ec7..fa5ce2d106a10 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala @@ -54,7 +54,7 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { JDBCOptions.JDBC_KEYTAB -> keytabFileName, JDBCOptions.JDBC_PRINCIPAL -> principal )) - new DB2ConnectionProvider(null, options).getAdditionalProperties() + new DB2ConnectionProvider().getAdditionalProperties(options) } override def beforeContainerStart( diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider new file mode 100644 index 0000000000000..6e42517a6d40c --- /dev/null +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider @@ -0,0 +1,6 @@ +org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.DB2ConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.MariaDBConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.MSSQLConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.PostgresConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.OracleConnectionProvider \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 9e0438c0016bd..3c62e6ba68056 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -23,12 +23,15 @@ import java.util.{Locale, Properties} import org.apache.commons.io.FilenameUtils import org.apache.spark.SparkFiles +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap /** + * ::DeveloperApi:: * Options for the JDBC data source. */ +@DeveloperApi class JDBCOptions( @transient val parameters: CaseInsensitiveMap[String]) extends Serializable with Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index db4715ef068b6..30f45cd81f834 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -62,7 +62,7 @@ object JdbcUtils extends Logging { throw new IllegalStateException( s"Did not find registered driver with class $driverClass") } - val connection = ConnectionProvider.create(driver, options).getConnection() + val connection = ConnectionProvider.create(driver, options) require(connection != null, s"The driver could not open a JDBC connection. Check the URL: ${options.url}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala index 16b244cc617ce..8e460e50088c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala @@ -18,18 +18,29 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} +import java.util.Properties import scala.collection.JavaConverters._ +import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[jdbc] class BasicConnectionProvider(driver: Driver, options: JDBCOptions) - extends ConnectionProvider { - def getConnection(): Connection = { - val properties = getAdditionalProperties() +private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with Logging { + /** + * Additional properties for data connection (Data source property takes precedence). + */ + def getAdditionalProperties(options: JDBCOptions): Properties = new Properties() + + override def canHandle(driver: Driver, options: JDBCOptions): Boolean = { + options.keytab == null || options.principal == null + } + + override def getConnection(driver: Driver, options: JDBCOptions): Connection = { + val properties = getAdditionalProperties(options) options.asConnectionProperties.entrySet().asScala.foreach { e => properties.put(e.getKey(), e.getValue()) } + logDebug(s"JDBC connection initiated with URL: ${options.url} and properties: $properties") driver.connect(options.url, properties) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index ce45be442ccc3..27611a844fcb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -18,60 +18,41 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} -import java.util.Properties +import java.util.ServiceLoader + +import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions - -/** - * Connection provider which opens connection toward various databases (database specific instance - * needed). If kerberos authentication required then it's the provider's responsibility to set all - * the parameters. - */ -private[jdbc] trait ConnectionProvider { - /** - * Additional properties for data connection (Data source property takes precedence). - */ - def getAdditionalProperties(): Properties = new Properties() - - /** - * Opens connection toward the database. - */ - def getConnection(): Connection -} +import org.apache.spark.util.Utils private[jdbc] object ConnectionProvider extends Logging { - def create(driver: Driver, options: JDBCOptions): ConnectionProvider = { - if (options.keytab == null || options.principal == null) { - logDebug("No authentication configuration found, using basic connection provider") - new BasicConnectionProvider(driver, options) - } else { - logDebug("Authentication configuration found, using database specific connection provider") - options.driverClass match { - case PostgresConnectionProvider.driverClass => - logDebug("Postgres connection provider found") - new PostgresConnectionProvider(driver, options) - - case MariaDBConnectionProvider.driverClass => - logDebug("MariaDB connection provider found") - new MariaDBConnectionProvider(driver, options) - - case DB2ConnectionProvider.driverClass => - logDebug("DB2 connection provider found") - new DB2ConnectionProvider(driver, options) - - case MSSQLConnectionProvider.driverClass => - logDebug("MS SQL connection provider found") - new MSSQLConnectionProvider(driver, options) - - case OracleConnectionProvider.driverClass => - logDebug("Oracle connection provider found") - new OracleConnectionProvider(driver, options) - - case _ => - throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " + - "Kerberos authentication") + val providers = loadProviders() + + private def loadProviders(): Seq[JdbcConnectionProvider] = { + val loader = ServiceLoader.load(classOf[JdbcConnectionProvider], + Utils.getContextOrSparkClassLoader) + val providers = mutable.ArrayBuffer[JdbcConnectionProvider]() + + val iterator = loader.iterator + while (iterator.hasNext) { + try { + val provider = iterator.next + logDebug(s"Loaded built in provider: $provider") + providers += provider + } catch { + case t: Throwable => + logError(s"Failed to load built in provider.", t) } } + providers + } + + def create(driver: Driver, options: JDBCOptions): Connection = { + val filteredProviders = providers.filter(_.canHandle(driver, options)) + logDebug(s"Filtered providers: $filteredProviders") + require(filteredProviders.size == 1, + "JDBC connection initiated but not exactly one connection provider found which can handle it") + filteredProviders.head.getConnection(driver, options) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala index 095821cf83890..033927edd7021 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala @@ -23,24 +23,26 @@ import java.util.Properties import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[sql] class DB2ConnectionProvider(driver: Driver, options: JDBCOptions) - extends SecureConnectionProvider(driver, options) { - override val appEntry: String = "JaasClient" +private[sql] class DB2ConnectionProvider extends SecureConnectionProvider { + override val driverClass = "com.ibm.db2.jcc.DB2Driver" - override def getConnection(): Connection = { - setAuthenticationConfigIfNeeded() + override def appEntry(driver: Driver, options: JDBCOptions): String = "JaasClient" + + override def getConnection(driver: Driver, options: JDBCOptions): Connection = { + setAuthenticationConfigIfNeeded(driver, options) UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( new PrivilegedExceptionAction[Connection]() { override def run(): Connection = { - DB2ConnectionProvider.super.getConnection() + DB2ConnectionProvider.super.getConnection(driver, options) } } ) } - override def getAdditionalProperties(): Properties = { + override def getAdditionalProperties(options: JDBCOptions): Properties = { val result = new Properties() // 11 is the integer value for kerberos result.put("securityMechanism", new String("11")) @@ -48,14 +50,12 @@ private[sql] class DB2ConnectionProvider(driver: Driver, options: JDBCOptions) result } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded( + driver: Driver, + options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object DB2ConnectionProvider { - val driverClass = "com.ibm.db2.jcc.DB2Driver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala new file mode 100644 index 0000000000000..4549cb120fd1c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc.connection + +import java.sql.{Connection, Driver} + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions + +/** + * ::DeveloperApi:: + * Connection provider which opens connection toward various databases (database specific instance + * needed). If any authentication required then it's the provider's responsibility to set all + * the parameters. If global JVM security configuration is changed then + * SecurityConfigurationLock must be used as lock to avoid race. + * Important to mention connection providers within a JVM used from multiple threads so adding + * internal state is not advised. If any state added then it must be synchronized properly. + */ +@DeveloperApi +trait JdbcConnectionProvider { + /** + * Checks if this connection provider instance can handle the connection initiated by the driver. + * There must be exactly one active connection provider which can handle the connection for a + * specific driver. If this requirement doesn't met then IllegalArgumentException + * will be thrown by the provider framework. + * @param driver Java driver which initiates the connection + * @param options Driver options which initiates the connection + * @return True if the connection provider can handle the driver with the given options. + */ + def canHandle(driver: Driver, options: JDBCOptions): Boolean + + /** + * Opens connection toward the database. + * @param driver Java driver which initiates the connection + * @param options Driver options which initiates the connection + * @return a Connection object that represents a connection to the URL + */ + def getConnection(driver: Driver, options: JDBCOptions): Connection +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala index 2950aa9b4db94..291c30f73b418 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala @@ -23,14 +23,14 @@ import java.util.Properties import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[sql] class MSSQLConnectionProvider( - driver: Driver, - options: JDBCOptions, - parserMethod: String = "parseAndMergeProperties" - ) extends SecureConnectionProvider(driver, options) { - override val appEntry: String = { +private[sql] class MSSQLConnectionProvider extends SecureConnectionProvider { + override val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" + val parserMethod: String = "parseAndMergeProperties" + + override def appEntry(driver: Driver, options: JDBCOptions): String = { val configName = "jaasConfigurationName" val appEntryDefault = "SQLJDBCDriver" @@ -58,18 +58,18 @@ private[sql] class MSSQLConnectionProvider( } } - override def getConnection(): Connection = { - setAuthenticationConfigIfNeeded() + override def getConnection(driver: Driver, options: JDBCOptions): Connection = { + setAuthenticationConfigIfNeeded(driver, options) UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( new PrivilegedExceptionAction[Connection]() { override def run(): Connection = { - MSSQLConnectionProvider.super.getConnection() + MSSQLConnectionProvider.super.getConnection(driver, options) } } ) } - override def getAdditionalProperties(): Properties = { + override def getAdditionalProperties(options: JDBCOptions): Properties = { val result = new Properties() // These props needed to reach internal kerberos authentication in the JDBC driver result.put("integratedSecurity", "true") @@ -77,8 +77,10 @@ private[sql] class MSSQLConnectionProvider( result } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded( + driver: Driver, + options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) /** * Couple of things to mention here (v8.2.2 client): * 1. MS SQL supports JAAS application name configuration @@ -87,11 +89,7 @@ private[sql] class MSSQLConnectionProvider( val entryUsesKeytab = configEntry != null && configEntry.exists(_.getOptions().get("useKeyTab") == "true") if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object MSSQLConnectionProvider { - val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala index 3c0286654a8ec..19fd201f6afa7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala @@ -19,16 +19,19 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.Driver +import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[jdbc] class MariaDBConnectionProvider(driver: Driver, options: JDBCOptions) - extends SecureConnectionProvider(driver, options) { - override val appEntry: String = { +private[jdbc] class MariaDBConnectionProvider extends SecureConnectionProvider { + override val driverClass = "org.mariadb.jdbc.Driver" + + override def appEntry(driver: Driver, options: JDBCOptions): String = "Krb5ConnectorContext" - } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded( + driver: Driver, + options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) /** * Couple of things to mention here (v2.5.4 client): * 1. MariaDB doesn't support JAAS application name configuration @@ -37,11 +40,7 @@ private[jdbc] class MariaDBConnectionProvider(driver: Driver, options: JDBCOptio val entryUsesKeytab = configEntry != null && configEntry.exists(_.getOptions().get("useKeyTab") == "true") if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object MariaDBConnectionProvider { - val driverClass = "org.mariadb.jdbc.Driver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala index c2b71b35b8128..a940cf66aa90a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala @@ -23,24 +23,26 @@ import java.util.Properties import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[sql] class OracleConnectionProvider(driver: Driver, options: JDBCOptions) - extends SecureConnectionProvider(driver, options) { - override val appEntry: String = "kprb5module" +private[sql] class OracleConnectionProvider extends SecureConnectionProvider { + override val driverClass = "oracle.jdbc.OracleDriver" - override def getConnection(): Connection = { - setAuthenticationConfigIfNeeded() + override def appEntry(driver: Driver, options: JDBCOptions): String = "kprb5module" + + override def getConnection(driver: Driver, options: JDBCOptions): Connection = { + setAuthenticationConfigIfNeeded(driver, options) UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( new PrivilegedExceptionAction[Connection]() { override def run(): Connection = { - OracleConnectionProvider.super.getConnection() + OracleConnectionProvider.super.getConnection(driver, options) } } ) } - override def getAdditionalProperties(): Properties = { + override def getAdditionalProperties(options: JDBCOptions): Properties = { val result = new Properties() // This prop is needed to turn on kerberos authentication in the JDBC driver. // The possible values can be found in AnoServices public interface @@ -49,14 +51,12 @@ private[sql] class OracleConnectionProvider(driver: Driver, options: JDBCOptions result } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded( + driver: Driver, + options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object OracleConnectionProvider { - val driverClass = "oracle.jdbc.OracleDriver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala index fa9232e00bd88..d936d48d7bece 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala @@ -20,24 +20,24 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.Driver import java.util.Properties +import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[jdbc] class PostgresConnectionProvider(driver: Driver, options: JDBCOptions) - extends SecureConnectionProvider(driver, options) { - override val appEntry: String = { +private[jdbc] class PostgresConnectionProvider extends SecureConnectionProvider { + override val driverClass = "org.postgresql.Driver" + + override def appEntry(driver: Driver, options: JDBCOptions): String = { val parseURL = driver.getClass.getMethod("parseURL", classOf[String], classOf[Properties]) val properties = parseURL.invoke(driver, options.url, null).asInstanceOf[Properties] properties.getProperty("jaasApplicationName", "pgjdbc") } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded( + driver: Driver, + options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object PostgresConnectionProvider { - val driverClass = "org.postgresql.Driver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala index 24eec63a7244f..76b1fb287e66b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala @@ -26,39 +26,47 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.util.SecurityUtils -/** - * Some of the secure connection providers modify global JVM security configuration. - * In order to avoid race the modification must be synchronized with this. - */ -private[connection] object SecurityConfigurationLock +private[jdbc] abstract class SecureConnectionProvider extends BasicConnectionProvider with Logging { + /** + * Returns the driver canonical class name which the connection provider supports. + */ + protected val driverClass: String + + override def canHandle(driver: Driver, options: JDBCOptions): Boolean = { + options.keytab != null && options.principal != null && + driverClass.equalsIgnoreCase(options.driverClass) + } -private[jdbc] abstract class SecureConnectionProvider(driver: Driver, options: JDBCOptions) - extends BasicConnectionProvider(driver, options) with Logging { - override def getConnection(): Connection = { - setAuthenticationConfigIfNeeded() - super.getConnection() + override def getConnection(driver: Driver, options: JDBCOptions): Connection = { + setAuthenticationConfigIfNeeded(driver, options) + super.getConnection(driver: Driver, options: JDBCOptions) } /** * Returns JAAS application name. This is sometimes configurable on the JDBC driver level. */ - val appEntry: String + def appEntry(driver: Driver, options: JDBCOptions): String /** * Sets database specific authentication configuration when needed. If configuration already set * then later calls must be no op. When the global JVM security configuration changed then the * related code parts must be synchronized properly. */ - def setAuthenticationConfigIfNeeded(): Unit + def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit - protected def getConfigWithAppEntry(): (Configuration, Array[AppConfigurationEntry]) = { + protected def getConfigWithAppEntry( + driver: Driver, + options: JDBCOptions): (Configuration, Array[AppConfigurationEntry]) = { val parent = Configuration.getConfiguration - (parent, parent.getAppConfigurationEntry(appEntry)) + (parent, parent.getAppConfigurationEntry(appEntry(driver, options))) } - protected def setAuthenticationConfig(parent: Configuration) = { + protected def setAuthenticationConfig( + parent: Configuration, + driver: Driver, + options: JDBCOptions) = { val config = new SecureConnectionProvider.JDBCConfiguration( - parent, appEntry, options.keytab, options.principal) + parent, appEntry(driver, options), options.keytab, options.principal) logDebug("Adding database specific security configuration") Configuration.setConfiguration(config) } diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider new file mode 100644 index 0000000000000..afb48e1a3511f --- /dev/null +++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider @@ -0,0 +1 @@ +org.apache.spark.sql.execution.datasources.jdbc.connection.IntentionallyFaultyConnectionProvider \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala index ff5fe4f620a1d..a5b16d565b884 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala @@ -20,26 +20,43 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import javax.security.auth.login.Configuration class ConnectionProviderSuite extends ConnectionProviderSuiteBase { + test("All built-in provides must be loaded") { + IntentionallyFaultyConnectionProvider.constructed = false + assert(ConnectionProvider.providers.exists(_.isInstanceOf[BasicConnectionProvider])) + assert(ConnectionProvider.providers.exists(_.isInstanceOf[DB2ConnectionProvider])) + assert(ConnectionProvider.providers.exists(_.isInstanceOf[MariaDBConnectionProvider])) + assert(ConnectionProvider.providers.exists(_.isInstanceOf[MSSQLConnectionProvider])) + assert(ConnectionProvider.providers.exists(_.isInstanceOf[PostgresConnectionProvider])) + assert(ConnectionProvider.providers.exists(_.isInstanceOf[OracleConnectionProvider])) + assert(IntentionallyFaultyConnectionProvider.constructed) + assert(!ConnectionProvider.providers. + exists(_.isInstanceOf[IntentionallyFaultyConnectionProvider])) + assert(ConnectionProvider.providers.size === 6) + } + test("Multiple security configs must be reachable") { Configuration.setConfiguration(null) - val postgresDriver = registerDriver(PostgresConnectionProvider.driverClass) - val postgresProvider = new PostgresConnectionProvider( - postgresDriver, options("jdbc:postgresql://localhost/postgres")) - val db2Driver = registerDriver(DB2ConnectionProvider.driverClass) - val db2Provider = new DB2ConnectionProvider(db2Driver, options("jdbc:db2://localhost/db2")) + val postgresProvider = new PostgresConnectionProvider() + val postgresDriver = registerDriver(postgresProvider.driverClass) + val postgresOptions = options("jdbc:postgresql://localhost/postgres") + val postgresAppEntry = postgresProvider.appEntry(postgresDriver, postgresOptions) + val db2Provider = new DB2ConnectionProvider() + val db2Driver = registerDriver(db2Provider.driverClass) + val db2Options = options("jdbc:db2://localhost/db2") + val db2AppEntry = db2Provider.appEntry(db2Driver, db2Options) // Make sure no authentication for the databases are set val oldConfig = Configuration.getConfiguration - assert(oldConfig.getAppConfigurationEntry(postgresProvider.appEntry) == null) - assert(oldConfig.getAppConfigurationEntry(db2Provider.appEntry) == null) + assert(oldConfig.getAppConfigurationEntry(postgresAppEntry) == null) + assert(oldConfig.getAppConfigurationEntry(db2AppEntry) == null) - postgresProvider.setAuthenticationConfigIfNeeded() - db2Provider.setAuthenticationConfigIfNeeded() + postgresProvider.setAuthenticationConfigIfNeeded(postgresDriver, postgresOptions) + db2Provider.setAuthenticationConfigIfNeeded(db2Driver, db2Options) // Make sure authentication for the databases are set val newConfig = Configuration.getConfiguration assert(oldConfig != newConfig) - assert(newConfig.getAppConfigurationEntry(postgresProvider.appEntry) != null) - assert(newConfig.getAppConfigurationEntry(db2Provider.appEntry) != null) + assert(newConfig.getAppConfigurationEntry(postgresAppEntry) != null) + assert(newConfig.getAppConfigurationEntry(db2AppEntry) != null) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala index d18a3088c4f2f..be08a3c2f7367 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala @@ -50,20 +50,25 @@ abstract class ConnectionProviderSuiteBase extends SparkFunSuite with BeforeAndA } } - protected def testSecureConnectionProvider(provider: SecureConnectionProvider): Unit = { + protected def testSecureConnectionProvider( + provider: SecureConnectionProvider, + driver: Driver, + options: JDBCOptions): Unit = { + val providerAppEntry = provider.appEntry(driver, options) + // Make sure no authentication for the database is set - assert(Configuration.getConfiguration.getAppConfigurationEntry(provider.appEntry) == null) + assert(Configuration.getConfiguration.getAppConfigurationEntry(providerAppEntry) == null) // Make sure the first call sets authentication properly val savedConfig = Configuration.getConfiguration - provider.setAuthenticationConfigIfNeeded() + provider.setAuthenticationConfigIfNeeded(driver, options) val config = Configuration.getConfiguration assert(savedConfig != config) - val appEntry = config.getAppConfigurationEntry(provider.appEntry) + val appEntry = config.getAppConfigurationEntry(providerAppEntry) assert(appEntry != null) // Make sure a second call is not modifying the existing authentication - provider.setAuthenticationConfigIfNeeded() - assert(config.getAppConfigurationEntry(provider.appEntry) === appEntry) + provider.setAuthenticationConfigIfNeeded(driver, options) + assert(config.getAppConfigurationEntry(providerAppEntry) === appEntry) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProviderSuite.scala index d656f83e2ebb9..5885af82532d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProviderSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class DB2ConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val driver = registerDriver(DB2ConnectionProvider.driverClass) - val provider = new DB2ConnectionProvider(driver, options("jdbc:db2://localhost/db2")) + val provider = new DB2ConnectionProvider() + val driver = registerDriver(provider.driverClass) - testSecureConnectionProvider(provider) + testSecureConnectionProvider(provider, driver, options("jdbc:db2://localhost/db2")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala new file mode 100644 index 0000000000000..c7ea8468477c1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc.connection + +import java.sql.{Connection, Driver} + +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions + +private[sql] class IntentionallyFaultyConnectionProvider extends JdbcConnectionProvider { + IntentionallyFaultyConnectionProvider.constructed = true + throw new IllegalArgumentException("Intentional Exception") + override def canHandle(driver: Driver, options: JDBCOptions): Boolean = true + override def getConnection(driver: Driver, options: JDBCOptions): Connection = null +} + +private object IntentionallyFaultyConnectionProvider { + var constructed = false +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala index 249f1e36347ed..a5704e842e018 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala @@ -17,35 +17,35 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection +import java.sql.Driver + +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions + class MSSQLConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded default parser must set authentication if not set") { - val driver = registerDriver(MSSQLConnectionProvider.driverClass) - val defaultProvider = new MSSQLConnectionProvider( - driver, options("jdbc:sqlserver://localhost/mssql")) - val customProvider = new MSSQLConnectionProvider( - driver, options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) + val provider = new MSSQLConnectionProvider() + val driver = registerDriver(provider.driverClass) - testProviders(defaultProvider, customProvider) + testProviders(driver, provider, options("jdbc:sqlserver://localhost/mssql"), + options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) } test("setAuthenticationConfigIfNeeded custom parser must set authentication if not set") { - val parserMethod = "IntentionallyNotExistingMethod" - val driver = registerDriver(MSSQLConnectionProvider.driverClass) - val defaultProvider = new MSSQLConnectionProvider( - driver, options("jdbc:sqlserver://localhost/mssql"), parserMethod) - val customProvider = new MSSQLConnectionProvider( - driver, - options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql"), - parserMethod) - - testProviders(defaultProvider, customProvider) + val provider = new MSSQLConnectionProvider() { + override val parserMethod: String = "IntentionallyNotExistingMethod" + } + val driver = registerDriver(provider.driverClass) + + testProviders(driver, provider, options("jdbc:sqlserver://localhost/mssql"), + options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) } private def testProviders( - defaultProvider: SecureConnectionProvider, - customProvider: SecureConnectionProvider) = { - assert(defaultProvider.appEntry !== customProvider.appEntry) - testSecureConnectionProvider(defaultProvider) - testSecureConnectionProvider(customProvider) + driver: Driver, + provider: SecureConnectionProvider, + defaultOptions: JDBCOptions, + customOptions: JDBCOptions) = { + testSecureConnectionProvider(provider, driver, defaultOptions) + testSecureConnectionProvider(provider, driver, customOptions) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala index 70cad2097eb43..f450662fcbe74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class MariaDBConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val driver = registerDriver(MariaDBConnectionProvider.driverClass) - val provider = new MariaDBConnectionProvider(driver, options("jdbc:mysql://localhost/mysql")) + val provider = new MariaDBConnectionProvider() + val driver = registerDriver(provider.driverClass) - testSecureConnectionProvider(provider) + testSecureConnectionProvider(provider, driver, options("jdbc:mysql://localhost/mysql")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala index 13cde32ddbe4e..40e7f1191dccc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class OracleConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val driver = registerDriver(OracleConnectionProvider.driverClass) - val provider = new OracleConnectionProvider(driver, - options("jdbc:oracle:thin:@//localhost/xe")) + val provider = new OracleConnectionProvider() + val driver = registerDriver(provider.driverClass) - testSecureConnectionProvider(provider) + testSecureConnectionProvider(provider, driver, options("jdbc:oracle:thin:@//localhost/xe")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala index 8cef7652f9c54..4b81b6d145fa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala @@ -19,14 +19,16 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class PostgresConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val driver = registerDriver(PostgresConnectionProvider.driverClass) - val defaultProvider = new PostgresConnectionProvider( - driver, options("jdbc:postgresql://localhost/postgres")) - val customProvider = new PostgresConnectionProvider( - driver, options(s"jdbc:postgresql://localhost/postgres?jaasApplicationName=custompgjdbc")) + val defaultProvider = new PostgresConnectionProvider() + val defaultOptions = options("jdbc:postgresql://localhost/postgres") + val customProvider = new PostgresConnectionProvider() + val customOptions = + options(s"jdbc:postgresql://localhost/postgres?jaasApplicationName=custompgjdbc") + val driver = registerDriver(defaultProvider.driverClass) - assert(defaultProvider.appEntry !== customProvider.appEntry) - testSecureConnectionProvider(defaultProvider) - testSecureConnectionProvider(customProvider) + assert(defaultProvider.appEntry(driver, defaultOptions) !== + customProvider.appEntry(driver, customOptions)) + testSecureConnectionProvider(defaultProvider, driver, defaultOptions) + testSecureConnectionProvider(customProvider, driver, customOptions) } } From e919ac47513085eaac500627a1791eb1d62c781e Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 8 Jul 2020 10:01:10 +0200 Subject: [PATCH 02/13] ConnectionProviderSuite fix --- .../jdbc/connection/ConnectionProvider.scala | 4 ++-- .../connection/ConnectionProviderSuite.scala | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index 27611a844fcb8..5c167cc7c7692 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -27,9 +27,9 @@ import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.util.Utils private[jdbc] object ConnectionProvider extends Logging { - val providers = loadProviders() + private val providers = loadProviders() - private def loadProviders(): Seq[JdbcConnectionProvider] = { + def loadProviders(): Seq[JdbcConnectionProvider] = { val loader = ServiceLoader.load(classOf[JdbcConnectionProvider], Utils.getContextOrSparkClassLoader) val providers = mutable.ArrayBuffer[JdbcConnectionProvider]() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala index a5b16d565b884..a48dbdebea7e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala @@ -22,16 +22,16 @@ import javax.security.auth.login.Configuration class ConnectionProviderSuite extends ConnectionProviderSuiteBase { test("All built-in provides must be loaded") { IntentionallyFaultyConnectionProvider.constructed = false - assert(ConnectionProvider.providers.exists(_.isInstanceOf[BasicConnectionProvider])) - assert(ConnectionProvider.providers.exists(_.isInstanceOf[DB2ConnectionProvider])) - assert(ConnectionProvider.providers.exists(_.isInstanceOf[MariaDBConnectionProvider])) - assert(ConnectionProvider.providers.exists(_.isInstanceOf[MSSQLConnectionProvider])) - assert(ConnectionProvider.providers.exists(_.isInstanceOf[PostgresConnectionProvider])) - assert(ConnectionProvider.providers.exists(_.isInstanceOf[OracleConnectionProvider])) + val providers = ConnectionProvider.loadProviders() + assert(providers.exists(_.isInstanceOf[BasicConnectionProvider])) + assert(providers.exists(_.isInstanceOf[DB2ConnectionProvider])) + assert(providers.exists(_.isInstanceOf[MariaDBConnectionProvider])) + assert(providers.exists(_.isInstanceOf[MSSQLConnectionProvider])) + assert(providers.exists(_.isInstanceOf[PostgresConnectionProvider])) + assert(providers.exists(_.isInstanceOf[OracleConnectionProvider])) assert(IntentionallyFaultyConnectionProvider.constructed) - assert(!ConnectionProvider.providers. - exists(_.isInstanceOf[IntentionallyFaultyConnectionProvider])) - assert(ConnectionProvider.providers.size === 6) + assert(!providers.exists(_.isInstanceOf[IntentionallyFaultyConnectionProvider])) + assert(providers.size === 6) } test("Multiple security configs must be reachable") { From 226f1774803873b8416d20ece79b98cb27079fb1 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 8 Jul 2020 11:24:08 +0200 Subject: [PATCH 03/13] Visibility fix --- .../jdbc/connection/IntentionallyFaultyConnectionProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala index c7ea8468477c1..5467d20a24e54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala @@ -21,7 +21,7 @@ import java.sql.{Connection, Driver} import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[sql] class IntentionallyFaultyConnectionProvider extends JdbcConnectionProvider { +private class IntentionallyFaultyConnectionProvider extends JdbcConnectionProvider { IntentionallyFaultyConnectionProvider.constructed = true throw new IllegalArgumentException("Intentional Exception") override def canHandle(driver: Driver, options: JDBCOptions): Boolean = true From 265b26e7a283896cbd6b9fb9c6e1f43074952957 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 10 Jul 2020 16:58:26 +0200 Subject: [PATCH 04/13] Minor simplification --- .../connection/PostgresConnectionProviderSuite.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala index 4b81b6d145fa4..ee43a7d9708c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala @@ -19,16 +19,14 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class PostgresConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val defaultProvider = new PostgresConnectionProvider() + val provider = new PostgresConnectionProvider() val defaultOptions = options("jdbc:postgresql://localhost/postgres") - val customProvider = new PostgresConnectionProvider() val customOptions = options(s"jdbc:postgresql://localhost/postgres?jaasApplicationName=custompgjdbc") - val driver = registerDriver(defaultProvider.driverClass) + val driver = registerDriver(provider.driverClass) - assert(defaultProvider.appEntry(driver, defaultOptions) !== - customProvider.appEntry(driver, customOptions)) - testSecureConnectionProvider(defaultProvider, driver, defaultOptions) - testSecureConnectionProvider(customProvider, driver, customOptions) + assert(provider.appEntry(driver, defaultOptions) !== provider.appEntry(driver, customOptions)) + testSecureConnectionProvider(provider, driver, defaultOptions) + testSecureConnectionProvider(provider, driver, customOptions) } } From 50e33ea317ef0d9ee6babba3d7e83145f1c86ec2 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 Sep 2020 11:04:40 +0200 Subject: [PATCH 05/13] Remove JDBCOptions from the API --- .../datasources/jdbc/JDBCOptions.scala | 3 --- .../datasources/jdbc/JdbcUtils.scala | 2 +- .../connection/BasicConnectionProvider.scala | 20 +++++++++---------- .../jdbc/connection/ConnectionProvider.scala | 3 +-- .../connection/DB2ConnectionProvider.scala | 18 +++++++++-------- .../connection/JdbcConnectionProvider.scala | 5 ++--- .../connection/MSSQLConnectionProvider.scala | 18 +++++++++-------- .../connection/OracleConnectionProvider.scala | 18 +++++++++-------- .../connection/SecureConnectionProvider.scala | 14 +++++++------ ...ntentionallyFaultyConnectionProvider.scala | 6 ++---- 10 files changed, 54 insertions(+), 53 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 3c62e6ba68056..9e0438c0016bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -23,15 +23,12 @@ import java.util.{Locale, Properties} import org.apache.commons.io.FilenameUtils import org.apache.spark.SparkFiles -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap /** - * ::DeveloperApi:: * Options for the JDBC data source. */ -@DeveloperApi class JDBCOptions( @transient val parameters: CaseInsensitiveMap[String]) extends Serializable with Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 30f45cd81f834..59257c3db6105 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -62,7 +62,7 @@ object JdbcUtils extends Logging { throw new IllegalStateException( s"Did not find registered driver with class $driverClass") } - val connection = ConnectionProvider.create(driver, options) + val connection = ConnectionProvider.create(driver, options.parameters) require(connection != null, s"The driver could not open a JDBC connection. Check the URL: ${options.url}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala index 8e460e50088c8..eade06f3e289b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} import java.util.Properties -import scala.collection.JavaConverters._ - import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions @@ -31,16 +29,18 @@ private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with */ def getAdditionalProperties(options: JDBCOptions): Properties = new Properties() - override def canHandle(driver: Driver, options: JDBCOptions): Boolean = { - options.keytab == null || options.principal == null + override def canHandle(driver: Driver, options: Map[String, String]): Boolean = { + val jdbcOptions = new JDBCOptions(options) + jdbcOptions.keytab == null || jdbcOptions.principal == null } - override def getConnection(driver: Driver, options: JDBCOptions): Connection = { - val properties = getAdditionalProperties(options) - options.asConnectionProperties.entrySet().asScala.foreach { e => - properties.put(e.getKey(), e.getValue()) + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + val properties = getAdditionalProperties(jdbcOptions) + options.foreach { case(k, v) => + properties.put(k, v) } - logDebug(s"JDBC connection initiated with URL: ${options.url} and properties: $properties") - driver.connect(options.url, properties) + logDebug(s"JDBC connection initiated with URL: ${jdbcOptions.url} and properties: $properties") + driver.connect(jdbcOptions.url, properties) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index 5c167cc7c7692..6bc734aa6ef7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -23,7 +23,6 @@ import java.util.ServiceLoader import scala.collection.mutable import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.util.Utils private[jdbc] object ConnectionProvider extends Logging { @@ -48,7 +47,7 @@ private[jdbc] object ConnectionProvider extends Logging { providers } - def create(driver: Driver, options: JDBCOptions): Connection = { + def create(driver: Driver, options: Map[String, String]): Connection = { val filteredProviders = providers.filter(_.canHandle(driver, options)) logDebug(s"Filtered providers: $filteredProviders") require(filteredProviders.size == 1, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala index 033927edd7021..8f6c28c7a729a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala @@ -31,15 +31,17 @@ private[sql] class DB2ConnectionProvider extends SecureConnectionProvider { override def appEntry(driver: Driver, options: JDBCOptions): String = "JaasClient" - override def getConnection(driver: Driver, options: JDBCOptions): Connection = { - setAuthenticationConfigIfNeeded(driver, options) - UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( - new PrivilegedExceptionAction[Connection]() { - override def run(): Connection = { - DB2ConnectionProvider.super.getConnection(driver, options) + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + setAuthenticationConfigIfNeeded(driver, jdbcOptions) + UserGroupInformation.loginUserFromKeytabAndReturnUGI(jdbcOptions.principal, jdbcOptions.keytab) + .doAs( + new PrivilegedExceptionAction[Connection]() { + override def run(): Connection = { + DB2ConnectionProvider.super.getConnection(driver, options) + } } - } - ) + ) } override def getAdditionalProperties(options: JDBCOptions): Properties = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala index 4549cb120fd1c..37c3fd64ebd80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions /** * ::DeveloperApi:: @@ -42,7 +41,7 @@ trait JdbcConnectionProvider { * @param options Driver options which initiates the connection * @return True if the connection provider can handle the driver with the given options. */ - def canHandle(driver: Driver, options: JDBCOptions): Boolean + def canHandle(driver: Driver, options: Map[String, String]): Boolean /** * Opens connection toward the database. @@ -50,5 +49,5 @@ trait JdbcConnectionProvider { * @param options Driver options which initiates the connection * @return a Connection object that represents a connection to the URL */ - def getConnection(driver: Driver, options: JDBCOptions): Connection + def getConnection(driver: Driver, options: Map[String, String]): Connection } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala index 291c30f73b418..b026958ef2a4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala @@ -58,15 +58,17 @@ private[sql] class MSSQLConnectionProvider extends SecureConnectionProvider { } } - override def getConnection(driver: Driver, options: JDBCOptions): Connection = { - setAuthenticationConfigIfNeeded(driver, options) - UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( - new PrivilegedExceptionAction[Connection]() { - override def run(): Connection = { - MSSQLConnectionProvider.super.getConnection(driver, options) + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + setAuthenticationConfigIfNeeded(driver, jdbcOptions) + UserGroupInformation.loginUserFromKeytabAndReturnUGI(jdbcOptions.principal, jdbcOptions.keytab) + .doAs( + new PrivilegedExceptionAction[Connection]() { + override def run(): Connection = { + MSSQLConnectionProvider.super.getConnection(driver, options) + } } - } - ) + ) } override def getAdditionalProperties(options: JDBCOptions): Properties = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala index a940cf66aa90a..0f91219e08ea6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala @@ -31,15 +31,17 @@ private[sql] class OracleConnectionProvider extends SecureConnectionProvider { override def appEntry(driver: Driver, options: JDBCOptions): String = "kprb5module" - override def getConnection(driver: Driver, options: JDBCOptions): Connection = { - setAuthenticationConfigIfNeeded(driver, options) - UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( - new PrivilegedExceptionAction[Connection]() { - override def run(): Connection = { - OracleConnectionProvider.super.getConnection(driver, options) + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + setAuthenticationConfigIfNeeded(driver, jdbcOptions) + UserGroupInformation.loginUserFromKeytabAndReturnUGI(jdbcOptions.principal, jdbcOptions.keytab) + .doAs( + new PrivilegedExceptionAction[Connection]() { + override def run(): Connection = { + OracleConnectionProvider.super.getConnection(driver, options) + } } - } - ) + ) } override def getAdditionalProperties(options: JDBCOptions): Properties = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala index 76b1fb287e66b..80c795957dac8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala @@ -32,14 +32,16 @@ private[jdbc] abstract class SecureConnectionProvider extends BasicConnectionPro */ protected val driverClass: String - override def canHandle(driver: Driver, options: JDBCOptions): Boolean = { - options.keytab != null && options.principal != null && - driverClass.equalsIgnoreCase(options.driverClass) + override def canHandle(driver: Driver, options: Map[String, String]): Boolean = { + val jdbcOptions = new JDBCOptions(options) + jdbcOptions.keytab != null && jdbcOptions.principal != null && + driverClass.equalsIgnoreCase(jdbcOptions.driverClass) } - override def getConnection(driver: Driver, options: JDBCOptions): Connection = { - setAuthenticationConfigIfNeeded(driver, options) - super.getConnection(driver: Driver, options: JDBCOptions) + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + setAuthenticationConfigIfNeeded(driver, jdbcOptions) + super.getConnection(driver: Driver, options: Map[String, String]) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala index 5467d20a24e54..bf561b7959454 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala @@ -19,13 +19,11 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} -import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions - private class IntentionallyFaultyConnectionProvider extends JdbcConnectionProvider { IntentionallyFaultyConnectionProvider.constructed = true throw new IllegalArgumentException("Intentional Exception") - override def canHandle(driver: Driver, options: JDBCOptions): Boolean = true - override def getConnection(driver: Driver, options: JDBCOptions): Connection = null + override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true + override def getConnection(driver: Driver, options: Map[String, String]): Connection = null } private object IntentionallyFaultyConnectionProvider { From 054535e3fb91c3bd8d5c66da53204620fe6385ad Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 Sep 2020 11:07:43 +0200 Subject: [PATCH 06/13] Add unstable --- .../datasources/jdbc/connection/JdbcConnectionProvider.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala index 37c3fd64ebd80..dc0859cf04176 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{DeveloperApi, Unstable} /** * ::DeveloperApi:: @@ -31,6 +31,7 @@ import org.apache.spark.annotation.DeveloperApi * internal state is not advised. If any state added then it must be synchronized properly. */ @DeveloperApi +@Unstable trait JdbcConnectionProvider { /** * Checks if this connection provider instance can handle the connection initiated by the driver. From d54596f074bcbd2d0559e38255f693ce548117c3 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 Sep 2020 11:12:45 +0200 Subject: [PATCH 07/13] Add since + change to backquotes --- .../jdbc/connection/JdbcConnectionProvider.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala index dc0859cf04176..716b8d2a34758 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala @@ -19,24 +19,25 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} -import org.apache.spark.annotation.{DeveloperApi, Unstable} +import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} /** * ::DeveloperApi:: * Connection provider which opens connection toward various databases (database specific instance * needed). If any authentication required then it's the provider's responsibility to set all * the parameters. If global JVM security configuration is changed then - * SecurityConfigurationLock must be used as lock to avoid race. + * `SecurityConfigurationLock` must be used as lock to avoid race. * Important to mention connection providers within a JVM used from multiple threads so adding * internal state is not advised. If any state added then it must be synchronized properly. */ @DeveloperApi @Unstable +@Since("3.1.0") trait JdbcConnectionProvider { /** * Checks if this connection provider instance can handle the connection initiated by the driver. * There must be exactly one active connection provider which can handle the connection for a - * specific driver. If this requirement doesn't met then IllegalArgumentException + * specific driver. If this requirement doesn't met then `IllegalArgumentException` * will be thrown by the provider framework. * @param driver Java driver which initiates the connection * @param options Driver options which initiates the connection @@ -48,7 +49,7 @@ trait JdbcConnectionProvider { * Opens connection toward the database. * @param driver Java driver which initiates the connection * @param options Driver options which initiates the connection - * @return a Connection object that represents a connection to the URL + * @return a `Connection` object that represents a connection to the URL */ def getConnection(driver: Driver, options: Map[String, String]): Connection } From 184fd2655bcac950a63db5a3294aaa1d9a27781b Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 Sep 2020 11:35:57 +0200 Subject: [PATCH 08/13] Move JdbcConnectionProvider API into another package and change to abstract class --- ...> org.apache.spark.sql.jdbc.JdbcConnectionProvider} | 0 .../jdbc/connection/BasicConnectionProvider.scala | 1 + .../jdbc/connection/ConnectionProvider.scala | 1 + .../connection => jdbc}/JdbcConnectionProvider.scala | 10 ++++++---- ...> org.apache.spark.sql.jdbc.JdbcConnectionProvider} | 0 .../IntentionallyFaultyConnectionProvider.scala | 2 ++ 6 files changed, 10 insertions(+), 4 deletions(-) rename sql/core/src/main/resources/META-INF/services/{org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider => org.apache.spark.sql.jdbc.JdbcConnectionProvider} (100%) rename sql/core/src/main/scala/org/apache/spark/sql/{execution/datasources/jdbc/connection => jdbc}/JdbcConnectionProvider.scala (91%) rename sql/core/src/test/resources/META-INF/services/{org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider => org.apache.spark.sql.jdbc.JdbcConnectionProvider} (100%) diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider similarity index 100% rename from sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider rename to sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala index eade06f3e289b..a5f04649e6628 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala @@ -22,6 +22,7 @@ import java.util.Properties import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions +import org.apache.spark.sql.jdbc.JdbcConnectionProvider private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with Logging { /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index 6bc734aa6ef7b..bec693fd396a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -23,6 +23,7 @@ import java.util.ServiceLoader import scala.collection.mutable import org.apache.spark.internal.Logging +import org.apache.spark.sql.jdbc.JdbcConnectionProvider import org.apache.spark.util.Utils private[jdbc] object ConnectionProvider extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala similarity index 91% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala rename to sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala index 716b8d2a34758..8fd81da1ffbe8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/JdbcConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.jdbc.connection +package org.apache.spark.sql.jdbc import java.sql.{Connection, Driver} @@ -33,13 +33,14 @@ import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} @DeveloperApi @Unstable @Since("3.1.0") -trait JdbcConnectionProvider { +abstract class JdbcConnectionProvider { /** * Checks if this connection provider instance can handle the connection initiated by the driver. * There must be exactly one active connection provider which can handle the connection for a * specific driver. If this requirement doesn't met then `IllegalArgumentException` * will be thrown by the provider framework. - * @param driver Java driver which initiates the connection + * + * @param driver Java driver which initiates the connection * @param options Driver options which initiates the connection * @return True if the connection provider can handle the driver with the given options. */ @@ -47,7 +48,8 @@ trait JdbcConnectionProvider { /** * Opens connection toward the database. - * @param driver Java driver which initiates the connection + * + * @param driver Java driver which initiates the connection * @param options Driver options which initiates the connection * @return a `Connection` object that represents a connection to the URL */ diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider similarity index 100% rename from sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.execution.datasources.jdbc.connection.JdbcConnectionProvider rename to sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala index bf561b7959454..fbefcb91cccde 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} +import org.apache.spark.sql.jdbc.JdbcConnectionProvider + private class IntentionallyFaultyConnectionProvider extends JdbcConnectionProvider { IntentionallyFaultyConnectionProvider.constructed = true throw new IllegalArgumentException("Intentional Exception") From 1f562f8b53d3b30bd049bf53e9e6d6019360c966 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 Sep 2020 11:48:00 +0200 Subject: [PATCH 09/13] Print out active providers in error --- .../datasources/jdbc/connection/ConnectionProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index bec693fd396a3..60be721c1462a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -50,9 +50,9 @@ private[jdbc] object ConnectionProvider extends Logging { def create(driver: Driver, options: Map[String, String]): Connection = { val filteredProviders = providers.filter(_.canHandle(driver, options)) - logDebug(s"Filtered providers: $filteredProviders") require(filteredProviders.size == 1, - "JDBC connection initiated but not exactly one connection provider found which can handle it") + "JDBC connection initiated but not exactly one connection provider found which can handle " + + s"it. Found active providers: ${filteredProviders.mkString(", ")}") filteredProviders.head.getConnection(driver, options) } } From b2fd5d827bcbbf46a484d5b0ba867935d48bbaa3 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 Sep 2020 13:37:48 +0200 Subject: [PATCH 10/13] Scala 2.13 compile fix --- .../datasources/jdbc/connection/ConnectionProvider.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index 60be721c1462a..e55f59049ccb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -45,7 +45,8 @@ private[jdbc] object ConnectionProvider extends Logging { logError(s"Failed to load built in provider.", t) } } - providers + // Seems duplicate but it's needed for Scala 2.13 + providers.toSeq } def create(driver: Driver, options: Map[String, String]): Connection = { From 12973045a0ade8465427783e14eef59115c69f5c Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 Sep 2020 14:45:08 +0200 Subject: [PATCH 11/13] Remove SecurityConfigurationLock from API --- .../apache/spark/security/SecurityConfigurationLock.scala | 4 ---- .../datasources/jdbc/connection/ConnectionProvider.scala | 5 ++++- .../datasources/jdbc/connection/DB2ConnectionProvider.scala | 5 +---- .../jdbc/connection/MSSQLConnectionProvider.scala | 5 +---- .../jdbc/connection/MariaDBConnectionProvider.scala | 5 +---- .../jdbc/connection/OracleConnectionProvider.scala | 5 +---- .../jdbc/connection/PostgresConnectionProvider.scala | 5 +---- .../org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala | 6 +++--- 8 files changed, 12 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala b/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala index 17c0d58a52e30..0741a8c1580df 100644 --- a/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala +++ b/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala @@ -17,12 +17,8 @@ package org.apache.spark.security -import org.apache.spark.annotation.DeveloperApi - /** - * ::DeveloperApi:: * There are cases when global JVM security configuration must be modified. * In order to avoid race the modification must be synchronized with this. */ -@DeveloperApi object SecurityConfigurationLock diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index e55f59049ccb5..546756677edce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -23,6 +23,7 @@ import java.util.ServiceLoader import scala.collection.mutable import org.apache.spark.internal.Logging +import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.jdbc.JdbcConnectionProvider import org.apache.spark.util.Utils @@ -54,6 +55,8 @@ private[jdbc] object ConnectionProvider extends Logging { require(filteredProviders.size == 1, "JDBC connection initiated but not exactly one connection provider found which can handle " + s"it. Found active providers: ${filteredProviders.mkString(", ")}") - filteredProviders.head.getConnection(driver, options) + SecurityConfigurationLock.synchronized { + filteredProviders.head.getConnection(driver, options) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala index 8f6c28c7a729a..ca82cdc561bef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala @@ -23,7 +23,6 @@ import java.util.Properties import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions private[sql] class DB2ConnectionProvider extends SecureConnectionProvider { @@ -52,9 +51,7 @@ private[sql] class DB2ConnectionProvider extends SecureConnectionProvider { result } - override def setAuthenticationConfigIfNeeded( - driver: Driver, - options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { setAuthenticationConfig(parent, driver, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala index b026958ef2a4b..4e405b2187e56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala @@ -23,7 +23,6 @@ import java.util.Properties import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions private[sql] class MSSQLConnectionProvider extends SecureConnectionProvider { @@ -79,9 +78,7 @@ private[sql] class MSSQLConnectionProvider extends SecureConnectionProvider { result } - override def setAuthenticationConfigIfNeeded( - driver: Driver, - options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { val (parent, configEntry) = getConfigWithAppEntry(driver, options) /** * Couple of things to mention here (v8.2.2 client): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala index 19fd201f6afa7..d5fe13bf0ca19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.Driver -import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions private[jdbc] class MariaDBConnectionProvider extends SecureConnectionProvider { @@ -28,9 +27,7 @@ private[jdbc] class MariaDBConnectionProvider extends SecureConnectionProvider { override def appEntry(driver: Driver, options: JDBCOptions): String = "Krb5ConnectorContext" - override def setAuthenticationConfigIfNeeded( - driver: Driver, - options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { val (parent, configEntry) = getConfigWithAppEntry(driver, options) /** * Couple of things to mention here (v2.5.4 client): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala index 0f91219e08ea6..3defda3871765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala @@ -23,7 +23,6 @@ import java.util.Properties import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions private[sql] class OracleConnectionProvider extends SecureConnectionProvider { @@ -53,9 +52,7 @@ private[sql] class OracleConnectionProvider extends SecureConnectionProvider { result } - override def setAuthenticationConfigIfNeeded( - driver: Driver, - options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { setAuthenticationConfig(parent, driver, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala index d936d48d7bece..dae8aea81f20a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.Driver import java.util.Properties -import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions private[jdbc] class PostgresConnectionProvider extends SecureConnectionProvider { @@ -32,9 +31,7 @@ private[jdbc] class PostgresConnectionProvider extends SecureConnectionProvider properties.getProperty("jaasApplicationName", "pgjdbc") } - override def setAuthenticationConfigIfNeeded( - driver: Driver, - options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized { + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { setAuthenticationConfig(parent, driver, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala index 8fd81da1ffbe8..afa98a8381e8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala @@ -25,8 +25,7 @@ import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} * ::DeveloperApi:: * Connection provider which opens connection toward various databases (database specific instance * needed). If any authentication required then it's the provider's responsibility to set all - * the parameters. If global JVM security configuration is changed then - * `SecurityConfigurationLock` must be used as lock to avoid race. + * the parameters. * Important to mention connection providers within a JVM used from multiple threads so adding * internal state is not advised. If any state added then it must be synchronized properly. */ @@ -47,7 +46,8 @@ abstract class JdbcConnectionProvider { def canHandle(driver: Driver, options: Map[String, String]): Boolean /** - * Opens connection toward the database. + * Opens connection toward the database. Since global JVM security configuration change may needed + * this API is called synchronized by `SecurityConfigurationLock` to avoid race. * * @param driver Java driver which initiates the connection * @param options Driver options which initiates the connection From 9dbed59b758f4a532d457ee772d2c7a4b37ee52c Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 28 Sep 2020 18:04:29 +0200 Subject: [PATCH 12/13] Make JDBCOptions parameters not transient --- .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 9e0438c0016bd..e6fff8dbdbd7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap * Options for the JDBC data source. */ class JDBCOptions( - @transient val parameters: CaseInsensitiveMap[String]) + val parameters: CaseInsensitiveMap[String]) extends Serializable with Logging { import JDBCOptions._ @@ -209,7 +209,7 @@ class JDBCOptions( } class JdbcOptionsInWrite( - @transient override val parameters: CaseInsensitiveMap[String]) + override val parameters: CaseInsensitiveMap[String]) extends JDBCOptions(parameters) { import JDBCOptions._ From 04124905187fcab366c79adc587cfe2495bff5b3 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 29 Sep 2020 10:37:39 +0200 Subject: [PATCH 13/13] Move since into doc --- .../org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala index afa98a8381e8c..caf574b0c2284 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, Driver} -import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} +import org.apache.spark.annotation.{DeveloperApi, Unstable} /** * ::DeveloperApi:: @@ -28,10 +28,11 @@ import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} * the parameters. * Important to mention connection providers within a JVM used from multiple threads so adding * internal state is not advised. If any state added then it must be synchronized properly. + * + * @since 3.1.0 */ @DeveloperApi @Unstable -@Since("3.1.0") abstract class JdbcConnectionProvider { /** * Checks if this connection provider instance can handle the connection initiated by the driver.