-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32001][SQL]Create JDBC authentication provider developer API #29024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8caa8e7
e919ac4
226f177
265b26e
50e33ea
054535e
d54596f
184fd26
1f562f8
b2fd5d8
1297304
9dbed59
69856f6
0412490
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.security | ||
|
|
||
| /** | ||
| * There are cases when global JVM security configuration must be modified. | ||
| * In order to avoid race the modification must be synchronized with this. | ||
| */ | ||
| object SecurityConfigurationLock | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.DB2ConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.MariaDBConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.MSSQLConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.PostgresConnectionProvider | ||
| org.apache.spark.sql.execution.datasources.jdbc.connection.OracleConnectionProvider |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,60 +18,45 @@ | |||||||||||||||||||
| package org.apache.spark.sql.execution.datasources.jdbc.connection | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import java.sql.{Connection, Driver} | ||||||||||||||||||||
| import java.util.Properties | ||||||||||||||||||||
| import java.util.ServiceLoader | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import org.apache.spark.internal.Logging | ||||||||||||||||||||
| import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Connection provider which opens connection toward various databases (database specific instance | ||||||||||||||||||||
| * needed). If kerberos authentication required then it's the provider's responsibility to set all | ||||||||||||||||||||
| * the parameters. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| private[jdbc] trait ConnectionProvider { | ||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Additional properties for data connection (Data source property takes precedence). | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| def getAdditionalProperties(): Properties = new Properties() | ||||||||||||||||||||
| import scala.collection.mutable | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /** | ||||||||||||||||||||
| * Opens connection toward the database. | ||||||||||||||||||||
| */ | ||||||||||||||||||||
| def getConnection(): Connection | ||||||||||||||||||||
| } | ||||||||||||||||||||
| import org.apache.spark.internal.Logging | ||||||||||||||||||||
| import org.apache.spark.security.SecurityConfigurationLock | ||||||||||||||||||||
| import org.apache.spark.sql.jdbc.JdbcConnectionProvider | ||||||||||||||||||||
| import org.apache.spark.util.Utils | ||||||||||||||||||||
|
|
||||||||||||||||||||
| private[jdbc] object ConnectionProvider extends Logging { | ||||||||||||||||||||
| def create(driver: Driver, options: JDBCOptions): ConnectionProvider = { | ||||||||||||||||||||
| if (options.keytab == null || options.principal == null) { | ||||||||||||||||||||
| logDebug("No authentication configuration found, using basic connection provider") | ||||||||||||||||||||
| new BasicConnectionProvider(driver, options) | ||||||||||||||||||||
| } else { | ||||||||||||||||||||
| logDebug("Authentication configuration found, using database specific connection provider") | ||||||||||||||||||||
| options.driverClass match { | ||||||||||||||||||||
| case PostgresConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("Postgres connection provider found") | ||||||||||||||||||||
| new PostgresConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case MariaDBConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("MariaDB connection provider found") | ||||||||||||||||||||
| new MariaDBConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case DB2ConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("DB2 connection provider found") | ||||||||||||||||||||
| new DB2ConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case MSSQLConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("MS SQL connection provider found") | ||||||||||||||||||||
| new MSSQLConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case OracleConnectionProvider.driverClass => | ||||||||||||||||||||
| logDebug("Oracle connection provider found") | ||||||||||||||||||||
| new OracleConnectionProvider(driver, options) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| case _ => | ||||||||||||||||||||
| throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " + | ||||||||||||||||||||
| "Kerberos authentication") | ||||||||||||||||||||
| private val providers = loadProviders() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def loadProviders(): Seq[JdbcConnectionProvider] = { | ||||||||||||||||||||
| val loader = ServiceLoader.load(classOf[JdbcConnectionProvider], | ||||||||||||||||||||
| Utils.getContextOrSparkClassLoader) | ||||||||||||||||||||
| val providers = mutable.ArrayBuffer[JdbcConnectionProvider]() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| val iterator = loader.iterator | ||||||||||||||||||||
| while (iterator.hasNext) { | ||||||||||||||||||||
| try { | ||||||||||||||||||||
| val provider = iterator.next | ||||||||||||||||||||
| logDebug(s"Loaded built in provider: $provider") | ||||||||||||||||||||
| providers += provider | ||||||||||||||||||||
| } catch { | ||||||||||||||||||||
| case t: Throwable => | ||||||||||||||||||||
| logError(s"Failed to load built in provider.", t) | ||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am getting the following exception on my console permanently while running JDBC tests. Should it be really logged as an error?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe log it as a warning?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can decrease it to warning. The main message is to notify the user.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it okay to ignore the error case where it fails to load builtin providers? spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala Lines 694 to 702 in 94d648d
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you mean ignore? Providers must be loaded independently so we need to catch and ignore the exception.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the suggestion is let the exception fire then I would say it's bad idea. If a provider is not able to be loaded then the rest must go. We've had similar issue and expectation in hadoop delegation token area.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, the policy looks okay for secure connections, but how about the basic one,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Here it's the same since
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| // Seems duplicate but it's needed for Scala 2.13 | ||||||||||||||||||||
| providers.toSeq | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def create(driver: Driver, options: Map[String, String]): Connection = { | ||||||||||||||||||||
| val filteredProviders = providers.filter(_.canHandle(driver, options)) | ||||||||||||||||||||
| require(filteredProviders.size == 1, | ||||||||||||||||||||
| "JDBC connection initiated but not exactly one connection provider found which can handle " + | ||||||||||||||||||||
| s"it. Found active providers: ${filteredProviders.mkString(", ")}") | ||||||||||||||||||||
| SecurityConfigurationLock.synchronized { | ||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😭 I believe this synchronization introduces a significant performance bottleneck for applications that rely on being able to establish many connections simultaneously for performance reasons. This change forces concurrency to 1 when establishing database connections for a given JDBC driver and that strikes me as a significant user impacting change. Can anyone propose a workaround for this? I have an app that makes connections to thousands of databases and I can't upgrade to any version >=3.1.x because of this significant bottleneck. https://issues.apache.org/jira/browse/SPARK-37391 That screenshot only shows a handful of blocked threads, but in total I have 98 threads blocked waiting for the I do not have this issue when running Spark 3.0.1 with no code changes.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gaborgsomogyi is getConnection() not thread-safe?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Simply no. This is well thought initially even if it introduces bottlenecks. The main problem is that JVM has a single security context. In order to make a connection one MUST modify the security context, otherwise the JDBC connector is not able to provide the security credentials (TGT, keytab or whatever). Simply saying JDBC connectors are able to get credentials from the single security context. Since multiple connections are made concurrently (sometimes same DB type w/ different credentials) this must be synchronized not to have a malformed security context (we've made load test and added 10+ DBs and corrupted the security context pretty fast w/o sync and it was horror to find out what's going on).
I personally think the first one can't be solved, the second one could be cured w/ all JVM restarts but I think it's just not user friendly. If somebody has an excellent idea I would like to hear it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I can imagine is to pre-set the JVM JAAS config for databases from file like this: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know anything about the underlying problem, but it had crossed my mind that allowing the synchronization to be optional could be one path forward. @gaborgsomogyi may I trouble you for some references related to
so I can better familiarize myself with the problem that you are describing? Beyond this particular issue, what you've shared suggests that the concurrency utilized by my app could be causing us to crosswire data which would be a major problem. I guess I'd also ask, is there more to it than you described? It sounds like I should either have some cross wired data or if that's not the case then there is some missing piece of the puzzle that means the synchronization is not always required. Thanks in advance!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless somebody has a ground breaking idea providers must be synced where security enabled, we can free up some time when no authentication is in place.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The proposed solution seems straightforward to me and I'm happy to take a stab at it. Dumb question though, how would I eventually tweak the spark class loader so it would detect a custom I tried baking such a class into the uberjar containing the driver app, but I didn't seem to be able to get My spark-submit looks roughly like so:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gaborgsomogyi thanks! The Here's my stab at what you suggested: #34745 I marked it a WIP, but it is complete as far as I am concerned and aware. I just want to make sure it matches up with what you had in mind before I open it up to broader review. |
||||||||||||||||||||
| filteredProviders.head.getConnection(driver, options) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||

Uh oh!
There was an error while loading. Please reload this page.