-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32001][SQL]Create JDBC authentication provider developer API #29024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
8caa8e7
e919ac4
226f177
265b26e
50e33ea
054535e
d54596f
184fd26
1f562f8
b2fd5d8
1297304
9dbed59
69856f6
0412490
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.security | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
|
|
||
| /** | ||
| * ::DeveloperApi:: | ||
| * There are cases when global JVM security configuration must be modified. | ||
| * In order to avoid race the modification must be synchronized with this. | ||
| */ | ||
| @DeveloperApi | ||
| object SecurityConfigurationLock | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.DB2ConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.MariaDBConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.MSSQLConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.PostgresConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.OracleConnectionProvider |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,12 +23,15 @@ import java.util.{Locale, Properties} | |
| import org.apache.commons.io.FilenameUtils | ||
|
|
||
| import org.apache.spark.SparkFiles | ||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
|
|
||
| /** | ||
| * ::DeveloperApi:: | ||
| * Options for the JDBC data source. | ||
| */ | ||
| @DeveloperApi | ||
|
||
| class JDBCOptions( | ||
| @transient val parameters: CaseInsensitiveMap[String]) | ||
| extends Serializable with Logging { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,60 +18,41 @@ | |||||||||||||||||||
| package org.apache.spark.sql.execution.datasources.jdbc.connection | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import java.sql.{Connection, Driver} | ||||||||||||||||||||
| import java.util.Properties | ||||||||||||||||||||
| import java.util.ServiceLoader | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import scala.collection.mutable | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import org.apache.spark.internal.Logging | ||||||||||||||||||||
| import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Connection provider which opens connection toward various databases (database specific instance | ||||||||||||||||||||
| * needed). If kerberos authentication required then it's the provider's responsibility to set all | ||||||||||||||||||||
| * the parameters. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| private[jdbc] trait ConnectionProvider { | ||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Additional properties for data connection (Data source property takes precedence). | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| def getAdditionalProperties(): Properties = new Properties() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Opens connection toward the database. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| def getConnection(): Connection | ||||||||||||||||||||
| } | ||||||||||||||||||||
| import org.apache.spark.util.Utils | ||||||||||||||||||||
|
|
||||||||||||||||||||
| private[jdbc] object ConnectionProvider extends Logging { | ||||||||||||||||||||
| def create(driver: Driver, options: JDBCOptions): ConnectionProvider = { | ||||||||||||||||||||
| if (options.keytab == null || options.principal == null) { | ||||||||||||||||||||
| logDebug("No authentication configuration found, using basic connection provider") | ||||||||||||||||||||
| new BasicConnectionProvider(driver, options) | ||||||||||||||||||||
| } else { | ||||||||||||||||||||
| logDebug("Authentication configuration found, using database specific connection provider") | ||||||||||||||||||||
| options.driverClass match { | ||||||||||||||||||||
| case PostgresConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("Postgres connection provider found") | ||||||||||||||||||||
| new PostgresConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case MariaDBConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("MariaDB connection provider found") | ||||||||||||||||||||
| new MariaDBConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case DB2ConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("DB2 connection provider found") | ||||||||||||||||||||
| new DB2ConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case MSSQLConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("MS SQL connection provider found") | ||||||||||||||||||||
| new MSSQLConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case OracleConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("Oracle connection provider found") | ||||||||||||||||||||
| new OracleConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case _ => | ||||||||||||||||||||
| throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " + | ||||||||||||||||||||
| "Kerberos authentication") | ||||||||||||||||||||
| private val providers = loadProviders() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def loadProviders(): Seq[JdbcConnectionProvider] = { | ||||||||||||||||||||
| val loader = ServiceLoader.load(classOf[JdbcConnectionProvider], | ||||||||||||||||||||
| Utils.getContextOrSparkClassLoader) | ||||||||||||||||||||
| val providers = mutable.ArrayBuffer[JdbcConnectionProvider]() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| val iterator = loader.iterator | ||||||||||||||||||||
| while (iterator.hasNext) { | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| val provider = iterator.next | ||||||||||||||||||||
| logDebug(s"Loaded built in provider: $provider") | ||||||||||||||||||||
| providers += provider | ||||||||||||||||||||
| } catch { | ||||||||||||||||||||
| case t: Throwable => | ||||||||||||||||||||
| logError(s"Failed to load built in provider.", t) | ||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am getting the following exception on my console permanently while running JDBC tests. Should it be really logged as an error?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe log it as a warning?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can decrease it to warning. The main message is to notify the user.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it okay to ignore the error case where it fails to load builtin providers? spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala Lines 694 to 702 in 94d648d
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you mean ignore? Providers must be loaded independently so we need to catch and ignore the exception.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the suggestion is let the exception fire then I would say it's bad idea. If a provider is not able to be loaded then the rest must go. We've had similar issue and expectation in hadoop delegation token area.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, the policy looks okay for secure connections, but how about the basic one,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Here it's the same since
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| providers | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def create(driver: Driver, options: JDBCOptions): Connection = { | ||||||||||||||||||||
| val filteredProviders = providers.filter(_.canHandle(driver, options)) | ||||||||||||||||||||
| logDebug(s"Filtered providers: $filteredProviders") | ||||||||||||||||||||
| require(filteredProviders.size == 1, | ||||||||||||||||||||
| "JDBC connection initiated but not exactly one connection provider found which can handle it") | ||||||||||||||||||||
gaborgsomogyi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||
| filteredProviders.head.getConnection(driver, options) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.jdbc.connection | ||
gaborgsomogyi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| import java.sql.{Connection, Driver} | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions | ||
|
|
||
| /** | ||
| * ::DeveloperApi:: | ||
| * Connection provider which opens connection toward various databases (database specific instance | ||
| * needed). If any authentication required then it's the provider's responsibility to set all | ||
| * the parameters. If global JVM security configuration is changed then | ||
| * <code>SecurityConfigurationLock</code> must be used as lock to avoid race. | ||
gaborgsomogyi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * Important to mention connection providers within a JVM used from multiple threads so adding | ||
| * internal state is not advised. If any state added then it must be synchronized properly. | ||
gaborgsomogyi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| */ | ||
| @DeveloperApi | ||
gaborgsomogyi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| trait JdbcConnectionProvider { | ||
| /** | ||
| * Checks if this connection provider instance can handle the connection initiated by the driver. | ||
| * There must be exactly one active connection provider which can handle the connection for a | ||
| * specific driver. If this requirement doesn't met then <code>IllegalArgumentException</code> | ||
| * will be thrown by the provider framework. | ||
| * @param driver Java driver which initiates the connection | ||
| * @param options Driver options which initiates the connection | ||
| * @return True if the connection provider can handle the driver with the given options. | ||
| */ | ||
| def canHandle(driver: Driver, options: JDBCOptions): Boolean | ||
|
|
||
| /** | ||
| * Opens connection toward the database. | ||
| * @param driver Java driver which initiates the connection | ||
| * @param options Driver options which initiates the connection | ||
| * @return a <code>Connection</code> object that represents a connection to the URL | ||
| */ | ||
| def getConnection(driver: Driver, options: JDBCOptions): Connection | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.