diff --git a/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala b/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala new file mode 100644 index 0000000000000..0741a8c1580df --- /dev/null +++ b/core/src/main/scala/org/apache/spark/security/SecurityConfigurationLock.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.security + +/** + * There are cases when global JVM security configuration must be modified. + * In order to avoid race the modification must be synchronized with this. + */ +object SecurityConfigurationLock diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala index fc88985cf2ec7..fa5ce2d106a10 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala @@ -54,7 +54,7 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { JDBCOptions.JDBC_KEYTAB -> keytabFileName, JDBCOptions.JDBC_PRINCIPAL -> principal )) - new DB2ConnectionProvider(null, options).getAdditionalProperties() + new DB2ConnectionProvider().getAdditionalProperties(options) } override def beforeContainerStart( diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider new file mode 100644 index 0000000000000..6e42517a6d40c --- /dev/null +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider @@ -0,0 +1,6 @@ +org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.DB2ConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.MariaDBConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.MSSQLConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.PostgresConnectionProvider +org.apache.spark.sql.execution.datasources.jdbc.connection.OracleConnectionProvider \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 9e0438c0016bd..e6fff8dbdbd7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap * Options for the JDBC data source. */ class JDBCOptions( - @transient val parameters: CaseInsensitiveMap[String]) + val parameters: CaseInsensitiveMap[String]) extends Serializable with Logging { import JDBCOptions._ @@ -209,7 +209,7 @@ class JDBCOptions( } class JdbcOptionsInWrite( - @transient override val parameters: CaseInsensitiveMap[String]) + override val parameters: CaseInsensitiveMap[String]) extends JDBCOptions(parameters) { import JDBCOptions._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5831c35c7e301..202f2e03b68d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -63,7 +63,7 @@ object JdbcUtils extends Logging { throw new IllegalStateException( s"Did not find registered driver with class $driverClass") } - val connection = ConnectionProvider.create(driver, options).getConnection() + val connection = ConnectionProvider.create(driver, options.parameters) require(connection != null, s"The driver could not open a JDBC connection. Check the URL: ${options.url}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala index 16b244cc617ce..a5f04649e6628 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala @@ -18,18 +18,30 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} +import java.util.Properties -import scala.collection.JavaConverters._ - +import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions +import org.apache.spark.sql.jdbc.JdbcConnectionProvider + +private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with Logging { + /** + * Additional properties for data connection (Data source property takes precedence). + */ + def getAdditionalProperties(options: JDBCOptions): Properties = new Properties() + + override def canHandle(driver: Driver, options: Map[String, String]): Boolean = { + val jdbcOptions = new JDBCOptions(options) + jdbcOptions.keytab == null || jdbcOptions.principal == null + } -private[jdbc] class BasicConnectionProvider(driver: Driver, options: JDBCOptions) - extends ConnectionProvider { - def getConnection(): Connection = { - val properties = getAdditionalProperties() - options.asConnectionProperties.entrySet().asScala.foreach { e => - properties.put(e.getKey(), e.getValue()) + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + val properties = getAdditionalProperties(jdbcOptions) + options.foreach { case(k, v) => + properties.put(k, v) } - driver.connect(options.url, properties) + logDebug(s"JDBC connection initiated with URL: ${jdbcOptions.url} and properties: $properties") + driver.connect(jdbcOptions.url, properties) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index ce45be442ccc3..546756677edce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -18,60 +18,45 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import java.sql.{Connection, Driver} -import java.util.Properties +import java.util.ServiceLoader -import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions - -/** - * Connection provider which opens connection toward various databases (database specific instance - * needed). If kerberos authentication required then it's the provider's responsibility to set all - * the parameters. - */ -private[jdbc] trait ConnectionProvider { - /** - * Additional properties for data connection (Data source property takes precedence). - */ - def getAdditionalProperties(): Properties = new Properties() +import scala.collection.mutable - /** - * Opens connection toward the database. - */ - def getConnection(): Connection -} +import org.apache.spark.internal.Logging +import org.apache.spark.security.SecurityConfigurationLock +import org.apache.spark.sql.jdbc.JdbcConnectionProvider +import org.apache.spark.util.Utils private[jdbc] object ConnectionProvider extends Logging { - def create(driver: Driver, options: JDBCOptions): ConnectionProvider = { - if (options.keytab == null || options.principal == null) { - logDebug("No authentication configuration found, using basic connection provider") - new BasicConnectionProvider(driver, options) - } else { - logDebug("Authentication configuration found, using database specific connection provider") - options.driverClass match { - case PostgresConnectionProvider.driverClass => - logDebug("Postgres connection provider found") - new PostgresConnectionProvider(driver, options) - - case MariaDBConnectionProvider.driverClass => - logDebug("MariaDB connection provider found") - new MariaDBConnectionProvider(driver, options) - - case DB2ConnectionProvider.driverClass => - logDebug("DB2 connection provider found") - new DB2ConnectionProvider(driver, options) - - case MSSQLConnectionProvider.driverClass => - logDebug("MS SQL connection provider found") - new MSSQLConnectionProvider(driver, options) - - case OracleConnectionProvider.driverClass => - logDebug("Oracle connection provider found") - new OracleConnectionProvider(driver, options) - - case _ => - throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " + - "Kerberos authentication") + private val providers = loadProviders() + + def loadProviders(): Seq[JdbcConnectionProvider] = { + val loader = ServiceLoader.load(classOf[JdbcConnectionProvider], + Utils.getContextOrSparkClassLoader) + val providers = mutable.ArrayBuffer[JdbcConnectionProvider]() + + val iterator = loader.iterator + while (iterator.hasNext) { + try { + val provider = iterator.next + logDebug(s"Loaded built in provider: $provider") + providers += provider + } catch { + case t: Throwable => + logError(s"Failed to load built in provider.", t) } } + // Seems duplicate but it's needed for Scala 2.13 + providers.toSeq + } + + def create(driver: Driver, options: Map[String, String]): Connection = { + val filteredProviders = providers.filter(_.canHandle(driver, options)) + require(filteredProviders.size == 1, + "JDBC connection initiated but not exactly one connection provider found which can handle " + + s"it. Found active providers: ${filteredProviders.mkString(", ")}") + SecurityConfigurationLock.synchronized { + filteredProviders.head.getConnection(driver, options) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala index 095821cf83890..ca82cdc561bef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProvider.scala @@ -25,22 +25,25 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[sql] class DB2ConnectionProvider(driver: Driver, options: JDBCOptions) - extends SecureConnectionProvider(driver, options) { - override val appEntry: String = "JaasClient" - - override def getConnection(): Connection = { - setAuthenticationConfigIfNeeded() - UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( - new PrivilegedExceptionAction[Connection]() { - override def run(): Connection = { - DB2ConnectionProvider.super.getConnection() +private[sql] class DB2ConnectionProvider extends SecureConnectionProvider { + override val driverClass = "com.ibm.db2.jcc.DB2Driver" + + override def appEntry(driver: Driver, options: JDBCOptions): String = "JaasClient" + + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + setAuthenticationConfigIfNeeded(driver, jdbcOptions) + UserGroupInformation.loginUserFromKeytabAndReturnUGI(jdbcOptions.principal, jdbcOptions.keytab) + .doAs( + new PrivilegedExceptionAction[Connection]() { + override def run(): Connection = { + DB2ConnectionProvider.super.getConnection(driver, options) + } } - } - ) + ) } - override def getAdditionalProperties(): Properties = { + override def getAdditionalProperties(options: JDBCOptions): Properties = { val result = new Properties() // 11 is the integer value for kerberos result.put("securityMechanism", new String("11")) @@ -48,14 +51,10 @@ private[sql] class DB2ConnectionProvider(driver: Driver, options: JDBCOptions) result } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object DB2ConnectionProvider { - val driverClass = "com.ibm.db2.jcc.DB2Driver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala index 2950aa9b4db94..4e405b2187e56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala @@ -25,12 +25,11 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[sql] class MSSQLConnectionProvider( - driver: Driver, - options: JDBCOptions, - parserMethod: String = "parseAndMergeProperties" - ) extends SecureConnectionProvider(driver, options) { - override val appEntry: String = { +private[sql] class MSSQLConnectionProvider extends SecureConnectionProvider { + override val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" + val parserMethod: String = "parseAndMergeProperties" + + override def appEntry(driver: Driver, options: JDBCOptions): String = { val configName = "jaasConfigurationName" val appEntryDefault = "SQLJDBCDriver" @@ -58,18 +57,20 @@ private[sql] class MSSQLConnectionProvider( } } - override def getConnection(): Connection = { - setAuthenticationConfigIfNeeded() - UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( - new PrivilegedExceptionAction[Connection]() { - override def run(): Connection = { - MSSQLConnectionProvider.super.getConnection() + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + setAuthenticationConfigIfNeeded(driver, jdbcOptions) + UserGroupInformation.loginUserFromKeytabAndReturnUGI(jdbcOptions.principal, jdbcOptions.keytab) + .doAs( + new PrivilegedExceptionAction[Connection]() { + override def run(): Connection = { + MSSQLConnectionProvider.super.getConnection(driver, options) + } } - } - ) + ) } - override def getAdditionalProperties(): Properties = { + override def getAdditionalProperties(options: JDBCOptions): Properties = { val result = new Properties() // These props needed to reach internal kerberos authentication in the JDBC driver result.put("integratedSecurity", "true") @@ -77,8 +78,8 @@ private[sql] class MSSQLConnectionProvider( result } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) /** * Couple of things to mention here (v8.2.2 client): * 1. MS SQL supports JAAS application name configuration @@ -87,11 +88,7 @@ private[sql] class MSSQLConnectionProvider( val entryUsesKeytab = configEntry != null && configEntry.exists(_.getOptions().get("useKeyTab") == "true") if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object MSSQLConnectionProvider { - val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala index 3c0286654a8ec..d5fe13bf0ca19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala @@ -21,14 +21,14 @@ import java.sql.Driver import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[jdbc] class MariaDBConnectionProvider(driver: Driver, options: JDBCOptions) - extends SecureConnectionProvider(driver, options) { - override val appEntry: String = { +private[jdbc] class MariaDBConnectionProvider extends SecureConnectionProvider { + override val driverClass = "org.mariadb.jdbc.Driver" + + override def appEntry(driver: Driver, options: JDBCOptions): String = "Krb5ConnectorContext" - } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) /** * Couple of things to mention here (v2.5.4 client): * 1. MariaDB doesn't support JAAS application name configuration @@ -37,11 +37,7 @@ private[jdbc] class MariaDBConnectionProvider(driver: Driver, options: JDBCOptio val entryUsesKeytab = configEntry != null && configEntry.exists(_.getOptions().get("useKeyTab") == "true") if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object MariaDBConnectionProvider { - val driverClass = "org.mariadb.jdbc.Driver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala index c2b71b35b8128..3defda3871765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProvider.scala @@ -25,22 +25,25 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[sql] class OracleConnectionProvider(driver: Driver, options: JDBCOptions) - extends SecureConnectionProvider(driver, options) { - override val appEntry: String = "kprb5module" - - override def getConnection(): Connection = { - setAuthenticationConfigIfNeeded() - UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs( - new PrivilegedExceptionAction[Connection]() { - override def run(): Connection = { - OracleConnectionProvider.super.getConnection() +private[sql] class OracleConnectionProvider extends SecureConnectionProvider { + override val driverClass = "oracle.jdbc.OracleDriver" + + override def appEntry(driver: Driver, options: JDBCOptions): String = "kprb5module" + + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + setAuthenticationConfigIfNeeded(driver, jdbcOptions) + UserGroupInformation.loginUserFromKeytabAndReturnUGI(jdbcOptions.principal, jdbcOptions.keytab) + .doAs( + new PrivilegedExceptionAction[Connection]() { + override def run(): Connection = { + OracleConnectionProvider.super.getConnection(driver, options) + } } - } - ) + ) } - override def getAdditionalProperties(): Properties = { + override def getAdditionalProperties(options: JDBCOptions): Properties = { val result = new Properties() // This prop is needed to turn on kerberos authentication in the JDBC driver. // The possible values can be found in AnoServices public interface @@ -49,14 +52,10 @@ private[sql] class OracleConnectionProvider(driver: Driver, options: JDBCOptions result } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object OracleConnectionProvider { - val driverClass = "oracle.jdbc.OracleDriver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala index fa9232e00bd88..dae8aea81f20a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProvider.scala @@ -22,22 +22,19 @@ import java.util.Properties import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions -private[jdbc] class PostgresConnectionProvider(driver: Driver, options: JDBCOptions) - extends SecureConnectionProvider(driver, options) { - override val appEntry: String = { +private[jdbc] class PostgresConnectionProvider extends SecureConnectionProvider { + override val driverClass = "org.postgresql.Driver" + + override def appEntry(driver: Driver, options: JDBCOptions): String = { val parseURL = driver.getClass.getMethod("parseURL", classOf[String], classOf[Properties]) val properties = parseURL.invoke(driver, options.url, null).asInstanceOf[Properties] properties.getProperty("jaasApplicationName", "pgjdbc") } - override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized { - val (parent, configEntry) = getConfigWithAppEntry() + override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = { + val (parent, configEntry) = getConfigWithAppEntry(driver, options) if (configEntry == null || configEntry.isEmpty) { - setAuthenticationConfig(parent) + setAuthenticationConfig(parent, driver, options) } } } - -private[sql] object PostgresConnectionProvider { - val driverClass = "org.postgresql.Driver" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala index 24eec63a7244f..80c795957dac8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala @@ -26,39 +26,49 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.util.SecurityUtils -/** - * Some of the secure connection providers modify global JVM security configuration. - * In order to avoid race the modification must be synchronized with this. - */ -private[connection] object SecurityConfigurationLock +private[jdbc] abstract class SecureConnectionProvider extends BasicConnectionProvider with Logging { + /** + * Returns the driver canonical class name which the connection provider supports. + */ + protected val driverClass: String + + override def canHandle(driver: Driver, options: Map[String, String]): Boolean = { + val jdbcOptions = new JDBCOptions(options) + jdbcOptions.keytab != null && jdbcOptions.principal != null && + driverClass.equalsIgnoreCase(jdbcOptions.driverClass) + } -private[jdbc] abstract class SecureConnectionProvider(driver: Driver, options: JDBCOptions) - extends BasicConnectionProvider(driver, options) with Logging { - override def getConnection(): Connection = { - setAuthenticationConfigIfNeeded() - super.getConnection() + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + val jdbcOptions = new JDBCOptions(options) + setAuthenticationConfigIfNeeded(driver, jdbcOptions) + super.getConnection(driver: Driver, options: Map[String, String]) } /** * Returns JAAS application name. This is sometimes configurable on the JDBC driver level. */ - val appEntry: String + def appEntry(driver: Driver, options: JDBCOptions): String /** * Sets database specific authentication configuration when needed. If configuration already set * then later calls must be no op. When the global JVM security configuration changed then the * related code parts must be synchronized properly. */ - def setAuthenticationConfigIfNeeded(): Unit + def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit - protected def getConfigWithAppEntry(): (Configuration, Array[AppConfigurationEntry]) = { + protected def getConfigWithAppEntry( + driver: Driver, + options: JDBCOptions): (Configuration, Array[AppConfigurationEntry]) = { val parent = Configuration.getConfiguration - (parent, parent.getAppConfigurationEntry(appEntry)) + (parent, parent.getAppConfigurationEntry(appEntry(driver, options))) } - protected def setAuthenticationConfig(parent: Configuration) = { + protected def setAuthenticationConfig( + parent: Configuration, + driver: Driver, + options: JDBCOptions) = { val config = new SecureConnectionProvider.JDBCConfiguration( - parent, appEntry, options.keytab, options.principal) + parent, appEntry(driver, options), options.keytab, options.principal) logDebug("Adding database specific security configuration") Configuration.setConfiguration(config) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala new file mode 100644 index 0000000000000..caf574b0c2284 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.{Connection, Driver} + +import org.apache.spark.annotation.{DeveloperApi, Unstable} + +/** + * ::DeveloperApi:: + * Connection provider which opens connection toward various databases (database specific instance + * needed). If any authentication required then it's the provider's responsibility to set all + * the parameters. + * Important to mention connection providers within a JVM used from multiple threads so adding + * internal state is not advised. If any state added then it must be synchronized properly. + * + * @since 3.1.0 + */ +@DeveloperApi +@Unstable +abstract class JdbcConnectionProvider { + /** + * Checks if this connection provider instance can handle the connection initiated by the driver. + * There must be exactly one active connection provider which can handle the connection for a + * specific driver. If this requirement doesn't met then `IllegalArgumentException` + * will be thrown by the provider framework. + * + * @param driver Java driver which initiates the connection + * @param options Driver options which initiates the connection + * @return True if the connection provider can handle the driver with the given options. + */ + def canHandle(driver: Driver, options: Map[String, String]): Boolean + + /** + * Opens connection toward the database. Since global JVM security configuration change may needed + * this API is called synchronized by `SecurityConfigurationLock` to avoid race. + * + * @param driver Java driver which initiates the connection + * @param options Driver options which initiates the connection + * @return a `Connection` object that represents a connection to the URL + */ + def getConnection(driver: Driver, options: Map[String, String]): Connection +} diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider new file mode 100644 index 0000000000000..afb48e1a3511f --- /dev/null +++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.jdbc.JdbcConnectionProvider @@ -0,0 +1 @@ +org.apache.spark.sql.execution.datasources.jdbc.connection.IntentionallyFaultyConnectionProvider \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala index ff5fe4f620a1d..a48dbdebea7e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala @@ -20,26 +20,43 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection import javax.security.auth.login.Configuration class ConnectionProviderSuite extends ConnectionProviderSuiteBase { + test("All built-in provides must be loaded") { + IntentionallyFaultyConnectionProvider.constructed = false + val providers = ConnectionProvider.loadProviders() + assert(providers.exists(_.isInstanceOf[BasicConnectionProvider])) + assert(providers.exists(_.isInstanceOf[DB2ConnectionProvider])) + assert(providers.exists(_.isInstanceOf[MariaDBConnectionProvider])) + assert(providers.exists(_.isInstanceOf[MSSQLConnectionProvider])) + assert(providers.exists(_.isInstanceOf[PostgresConnectionProvider])) + assert(providers.exists(_.isInstanceOf[OracleConnectionProvider])) + assert(IntentionallyFaultyConnectionProvider.constructed) + assert(!providers.exists(_.isInstanceOf[IntentionallyFaultyConnectionProvider])) + assert(providers.size === 6) + } + test("Multiple security configs must be reachable") { Configuration.setConfiguration(null) - val postgresDriver = registerDriver(PostgresConnectionProvider.driverClass) - val postgresProvider = new PostgresConnectionProvider( - postgresDriver, options("jdbc:postgresql://localhost/postgres")) - val db2Driver = registerDriver(DB2ConnectionProvider.driverClass) - val db2Provider = new DB2ConnectionProvider(db2Driver, options("jdbc:db2://localhost/db2")) + val postgresProvider = new PostgresConnectionProvider() + val postgresDriver = registerDriver(postgresProvider.driverClass) + val postgresOptions = options("jdbc:postgresql://localhost/postgres") + val postgresAppEntry = postgresProvider.appEntry(postgresDriver, postgresOptions) + val db2Provider = new DB2ConnectionProvider() + val db2Driver = registerDriver(db2Provider.driverClass) + val db2Options = options("jdbc:db2://localhost/db2") + val db2AppEntry = db2Provider.appEntry(db2Driver, db2Options) // Make sure no authentication for the databases are set val oldConfig = Configuration.getConfiguration - assert(oldConfig.getAppConfigurationEntry(postgresProvider.appEntry) == null) - assert(oldConfig.getAppConfigurationEntry(db2Provider.appEntry) == null) + assert(oldConfig.getAppConfigurationEntry(postgresAppEntry) == null) + assert(oldConfig.getAppConfigurationEntry(db2AppEntry) == null) - postgresProvider.setAuthenticationConfigIfNeeded() - db2Provider.setAuthenticationConfigIfNeeded() + postgresProvider.setAuthenticationConfigIfNeeded(postgresDriver, postgresOptions) + db2Provider.setAuthenticationConfigIfNeeded(db2Driver, db2Options) // Make sure authentication for the databases are set val newConfig = Configuration.getConfiguration assert(oldConfig != newConfig) - assert(newConfig.getAppConfigurationEntry(postgresProvider.appEntry) != null) - assert(newConfig.getAppConfigurationEntry(db2Provider.appEntry) != null) + assert(newConfig.getAppConfigurationEntry(postgresAppEntry) != null) + assert(newConfig.getAppConfigurationEntry(db2AppEntry) != null) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala index d18a3088c4f2f..be08a3c2f7367 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala @@ -50,20 +50,25 @@ abstract class ConnectionProviderSuiteBase extends SparkFunSuite with BeforeAndA } } - protected def testSecureConnectionProvider(provider: SecureConnectionProvider): Unit = { + protected def testSecureConnectionProvider( + provider: SecureConnectionProvider, + driver: Driver, + options: JDBCOptions): Unit = { + val providerAppEntry = provider.appEntry(driver, options) + // Make sure no authentication for the database is set - assert(Configuration.getConfiguration.getAppConfigurationEntry(provider.appEntry) == null) + assert(Configuration.getConfiguration.getAppConfigurationEntry(providerAppEntry) == null) // Make sure the first call sets authentication properly val savedConfig = Configuration.getConfiguration - provider.setAuthenticationConfigIfNeeded() + provider.setAuthenticationConfigIfNeeded(driver, options) val config = Configuration.getConfiguration assert(savedConfig != config) - val appEntry = config.getAppConfigurationEntry(provider.appEntry) + val appEntry = config.getAppConfigurationEntry(providerAppEntry) assert(appEntry != null) // Make sure a second call is not modifying the existing authentication - provider.setAuthenticationConfigIfNeeded() - assert(config.getAppConfigurationEntry(provider.appEntry) === appEntry) + provider.setAuthenticationConfigIfNeeded(driver, options) + assert(config.getAppConfigurationEntry(providerAppEntry) === appEntry) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProviderSuite.scala index d656f83e2ebb9..5885af82532d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/DB2ConnectionProviderSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class DB2ConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val driver = registerDriver(DB2ConnectionProvider.driverClass) - val provider = new DB2ConnectionProvider(driver, options("jdbc:db2://localhost/db2")) + val provider = new DB2ConnectionProvider() + val driver = registerDriver(provider.driverClass) - testSecureConnectionProvider(provider) + testSecureConnectionProvider(provider, driver, options("jdbc:db2://localhost/db2")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala new file mode 100644 index 0000000000000..fbefcb91cccde --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc.connection + +import java.sql.{Connection, Driver} + +import org.apache.spark.sql.jdbc.JdbcConnectionProvider + +private class IntentionallyFaultyConnectionProvider extends JdbcConnectionProvider { + IntentionallyFaultyConnectionProvider.constructed = true + throw new IllegalArgumentException("Intentional Exception") + override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true + override def getConnection(driver: Driver, options: Map[String, String]): Connection = null +} + +private object IntentionallyFaultyConnectionProvider { + var constructed = false +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala index 249f1e36347ed..a5704e842e018 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala @@ -17,35 +17,35 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection +import java.sql.Driver + +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions + class MSSQLConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded default parser must set authentication if not set") { - val driver = registerDriver(MSSQLConnectionProvider.driverClass) - val defaultProvider = new MSSQLConnectionProvider( - driver, options("jdbc:sqlserver://localhost/mssql")) - val customProvider = new MSSQLConnectionProvider( - driver, options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) + val provider = new MSSQLConnectionProvider() + val driver = registerDriver(provider.driverClass) - testProviders(defaultProvider, customProvider) + testProviders(driver, provider, options("jdbc:sqlserver://localhost/mssql"), + options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) } test("setAuthenticationConfigIfNeeded custom parser must set authentication if not set") { - val parserMethod = "IntentionallyNotExistingMethod" - val driver = registerDriver(MSSQLConnectionProvider.driverClass) - val defaultProvider = new MSSQLConnectionProvider( - driver, options("jdbc:sqlserver://localhost/mssql"), parserMethod) - val customProvider = new MSSQLConnectionProvider( - driver, - options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql"), - parserMethod) - - testProviders(defaultProvider, customProvider) + val provider = new MSSQLConnectionProvider() { + override val parserMethod: String = "IntentionallyNotExistingMethod" + } + val driver = registerDriver(provider.driverClass) + + testProviders(driver, provider, options("jdbc:sqlserver://localhost/mssql"), + options("jdbc:sqlserver://localhost/mssql;jaasConfigurationName=custommssql")) } private def testProviders( - defaultProvider: SecureConnectionProvider, - customProvider: SecureConnectionProvider) = { - assert(defaultProvider.appEntry !== customProvider.appEntry) - testSecureConnectionProvider(defaultProvider) - testSecureConnectionProvider(customProvider) + driver: Driver, + provider: SecureConnectionProvider, + defaultOptions: JDBCOptions, + customOptions: JDBCOptions) = { + testSecureConnectionProvider(provider, driver, defaultOptions) + testSecureConnectionProvider(provider, driver, customOptions) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala index 70cad2097eb43..f450662fcbe74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class MariaDBConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val driver = registerDriver(MariaDBConnectionProvider.driverClass) - val provider = new MariaDBConnectionProvider(driver, options("jdbc:mysql://localhost/mysql")) + val provider = new MariaDBConnectionProvider() + val driver = registerDriver(provider.driverClass) - testSecureConnectionProvider(provider) + testSecureConnectionProvider(provider, driver, options("jdbc:mysql://localhost/mysql")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala index 13cde32ddbe4e..40e7f1191dccc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/OracleConnectionProviderSuite.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class OracleConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val driver = registerDriver(OracleConnectionProvider.driverClass) - val provider = new OracleConnectionProvider(driver, - options("jdbc:oracle:thin:@//localhost/xe")) + val provider = new OracleConnectionProvider() + val driver = registerDriver(provider.driverClass) - testSecureConnectionProvider(provider) + testSecureConnectionProvider(provider, driver, options("jdbc:oracle:thin:@//localhost/xe")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala index 8cef7652f9c54..ee43a7d9708c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/PostgresConnectionProviderSuite.scala @@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.datasources.jdbc.connection class PostgresConnectionProviderSuite extends ConnectionProviderSuiteBase { test("setAuthenticationConfigIfNeeded must set authentication if not set") { - val driver = registerDriver(PostgresConnectionProvider.driverClass) - val defaultProvider = new PostgresConnectionProvider( - driver, options("jdbc:postgresql://localhost/postgres")) - val customProvider = new PostgresConnectionProvider( - driver, options(s"jdbc:postgresql://localhost/postgres?jaasApplicationName=custompgjdbc")) + val provider = new PostgresConnectionProvider() + val defaultOptions = options("jdbc:postgresql://localhost/postgres") + val customOptions = + options(s"jdbc:postgresql://localhost/postgres?jaasApplicationName=custompgjdbc") + val driver = registerDriver(provider.driverClass) - assert(defaultProvider.appEntry !== customProvider.appEntry) - testSecureConnectionProvider(defaultProvider) - testSecureConnectionProvider(customProvider) + assert(provider.appEntry(driver, defaultOptions) !== provider.appEntry(driver, customOptions)) + testSecureConnectionProvider(provider, driver, defaultOptions) + testSecureConnectionProvider(provider, driver, customOptions) } }