Skip to content

Conversation

@gaborgsomogyi
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi commented Jul 7, 2020

What changes were proposed in this pull request?

At the moment only the baked in JDBC connection providers can be used but there is a need to support additional databases and use-cases. In this PR I'm proposing a new developer API name JdbcConnectionProvider. To show how an external JDBC connection provider can be implemented I've created an example here.

The PR contains the following changes:

  • Added connection provider developer API
  • Made JDBC connection providers constructor to noarg => needed to load them w/ service loader
  • Connection providers are now loaded w/ service loader
  • Added tests to load providers independently
  • Moved SecurityConfigurationLock into a central place because other areas will change global JVM security config

Why are the changes needed?

No custom authentication possibility.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Existing + additional unit tests
  • Docker integration tests
  • Tested manually the newly created external JDBC connection provider

@gaborgsomogyi
Copy link
Contributor Author

I've created a WIP PR because the MariaDB integration test fails on master as well so I can't tell this change is good or not:

2020-07-07 12:43:13 8 [Warning] Aborted connection 8 to db: 'unconnected' user: 'unauthenticated' host: '172.17.0.1' (This connection closed normally without authentication)

I've started to analyze this issue separately.

@gaborgsomogyi
Copy link
Contributor Author

After some analysis I've filed SPARK-32211 which provides the solution.

@gaborgsomogyi
Copy link
Contributor Author

I've double tested and w/ the mentioned change MariaDBKrbIntegrationSuite passed as well.

@gaborgsomogyi gaborgsomogyi changed the title [WIP][SPARK-32001][SQL]Create JDBC authentication provider developer API [SPARK-32001][SQL]Create JDBC authentication provider developer API Jul 7, 2020
@gaborgsomogyi
Copy link
Contributor Author

cc @dongjoon-hyun @maropu

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125218 has finished for PR 29024 at commit 8caa8e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait JdbcConnectionProvider

@gaborgsomogyi gaborgsomogyi changed the title [SPARK-32001][SQL]Create JDBC authentication provider developer API [WIP][SPARK-32001][SQL]Create JDBC authentication provider developer API Jul 7, 2020
@gaborgsomogyi
Copy link
Contributor Author

The failure seems related so checking it...

@gaborgsomogyi
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125234 has finished for PR 29024 at commit 8caa8e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait JdbcConnectionProvider

@gaborgsomogyi
Copy link
Contributor Author

Found the issue. JDBCWriteSuite initiated provider load before ConnectionProviderSuite which resulted noop in the second case. All in all ConnectionProviderSuite formed badly as it is now so fixing...

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125331 has finished for PR 29024 at commit e919ac4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125344 has finished for PR 29024 at commit 226f177.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125385 has finished for PR 29024 at commit 226f177.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* ::DeveloperApi::
* Options for the JDBC data source.
*/
@DeveloperApi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal class, so could you avoid exposing it? How about using Map[String, String] instead in the provider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, this way the API is less exposed to changes.

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done the initial steps but I see bad and worse tradeoffs. The API is less dependent on JDBCOptions changes but the provider side implementation becames more complicated with either duplicated code or duplicated instantiation. Let me explain in examples:

Option1: Re-instantiate JDBCOptions where duplicated instantiation worries me + the general concern down below.

  override def canHandle(driver: Driver, options: Map[String, String]): Boolean = {
    val jdbcOptions = new JDBCOptions(options)
    jdbcOptions.keytab == null || jdbcOptions.principal == null
  }

Option2: Find out the needed parameters on by own where re-inventing the wheel feeling worries me + the general concern down below.

  override def canHandle(driver: Driver, options: Map[String, String]): Boolean = {
    val keytab = {
      val keytabParam = options.getOrElse(JDBC_KEYTAB, null)
      if (keytabParam != null && FilenameUtils.getPath(keytabParam).isEmpty) {
        val result = SparkFiles.get(keytabParam)
        logDebug(s"Keytab path not found, assuming --files, file name used on executor: $result")
        result
      } else {
        logDebug("Keytab path found, assuming manual upload")
        keytabParam
      }
    }
    val principal = options.getOrElse(JDBC_PRINCIPAL, null)
    keytab == null || principal == null
  }

General concern: Both cases Spark can be good because in the first case new JDBCOptions instantiation, in the second case copy paste moved into a base class can fill the gap. However considering myself as a 3rd-party developer I have not much options (since JDBCOptions is not exposed as developer API):

  • Copy and paste the code from JDBCOptions which may change over time
  • Implement the parameter parsing on my own which may contain bugs

Considering these findings I think it's better to keep JDBCOptions. WDYT?

Copy link
Member

@maropu maropu Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 looks better. But, 3rd-party developers need to use JDBCOptions there? Or, could we just pass the two params only?

  override def canHandle(driver: Driver, keytab: String, principal: String): Boolean = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could pass the 2 params but then we limit further implementation possibilities so I would vote on the map.
At the moment there is no need other params other than keytab and principal but later providers may need further things. It's not a strong opinion, just don't want to close later possibilities. If we agree on the way I'll do the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example I've given in the other open question here can show how parameters other than keytab and principal can be used. Not passing the whole map would close this possibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @maropu here. JDBCOptions is under execution package and meant to be private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon thanks for having a look!

I agree that JDBCOptions mustn't be exposed. Let me change the code to show option 1. As said passing only keytab: String, principal: String is not enough because not all but some of the providers need further configurations. I've started to work on this this change (unless anybody has better option).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed JDBCOptions from the API but keeping this open if further discussion needed.

@gaborgsomogyi
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125590 has finished for PR 29024 at commit 226f177.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

Gosh, jenkins still not stable. Sigh.

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125621 has finished for PR 29024 at commit 265b26e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125820 has finished for PR 29024 at commit 265b26e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 14, 2020

Test build #125830 has finished for PR 29024 at commit 265b26e.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33807/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Test build #129192 has finished for PR 29024 at commit 9dbed59.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

Will update the example provider to show the usage tomorrow...

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good otherwise. Big +1 for showing an example.

@gaborgsomogyi
Copy link
Contributor Author

I've just re-executed all the integration tests and seems like SPARK-32211 appeared again because it's not yet exists in this branch. Merging master into this branch and re-executing all integration tests again.

@gaborgsomogyi
Copy link
Contributor Author

I've just re-executed all the integration tests and all has passed.

@gaborgsomogyi
Copy link
Contributor Author

I've just updated the example to show the usage: https://github.com/gaborgsomogyi/spark-jdbc-connection-provider

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33843/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33844/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33843/

@HyukjinKwon
Copy link
Member

I'll merge in few days if there are no more comments.

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33844/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Test build #129228 has finished for PR 29024 at commit 0412490.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Test build #129227 has finished for PR 29024 at commit 69856f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33850/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33850/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Test build #129234 has finished for PR 29024 at commit 0412490.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@gaborgsomogyi
Copy link
Contributor Author

Thanks all of you guys to invest your time!

providers += provider
} catch {
case t: Throwable =>
logError(s"Failed to load built in provider.", t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting the following exception on my console permanently while running JDBC tests. Should it be really logged as an error?

14:31:25.070 ERROR org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider: Failed to load built in provider.
java.util.ServiceConfigurationError: org.apache.spark.sql.jdbc.JdbcConnectionProvider: Provider org.apache.spark.sql.execution.datasources.jdbc.connection.IntentionallyFaultyConnectionProvider could not be instantiated
	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.loadProviders(ConnectionProvider.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.<init>(ConnectionProvider.scala:31)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.<clinit>(ConnectionProvider.scala)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:66)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.withConnection(JDBCTableCatalog.scala:156)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.listTables(JDBCTableCatalog.scala:58)
	at org.apache.spark.sql.execution.datasources.v2.ShowTablesExec.run(ShowTablesExec.scala:42)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3675)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3673)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:612)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:607)
	at org.apache.spark.sql.test.SQLTestUtilsBase.$anonfun$sql$1(SQLTestUtils.scala:231)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalogSuite.$anonfun$new$2(JDBCTableCatalogSuite.scala:67)
	at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:134)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalogSuite.$anonfun$new$1(JDBCTableCatalogSuite.scala:67)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:176)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:61)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:61)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:61)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:61)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:40)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.IllegalArgumentException: Intentional Exception
	at org.apache.spark.sql.execution.datasources.jdbc.connection.IntentionallyFaultyConnectionProvider.<init>(IntentionallyFaultyConnectionProvider.scala:26)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at java.lang.Class.newInstance(Class.java:442)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
	... 82 more

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log it as a warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can decrease it to warning. The main message is to notify the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to ignore the error case where it fails to load builtin providers? DataSource throws an exception if it fails to load builtin datasources:

case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
"Please check if your library is compatible with Spark 2.0", e)
} else {
throw e
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you mean ignore? Providers must be loaded independently so we need to catch and ignore the exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the suggestion is let the exception fire then I would say it's bad idea. If a provider is not able to be loaded then the rest must go. We've had similar issue and expectation in hadoop delegation token area.

Copy link
Member

@maropu maropu Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, the policy looks okay for secure connections, but how about the basic one, BasicConnectionProvider? At least, until Spark v3.0, creating basic JDBC connections does not fail because of the loading failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, until Spark v3.0, creating basic JDBC connections does not fail because of the loading failure.

Here it's the same since BasicConnectionProvider is built-in and no pre-load inside. The main issue would come when one adds a new provider which unable to be loaded. That failure would make all the rest workload fail if we don't load them independently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dongjoon-hyun pushed a commit that referenced this pull request Nov 25, 2020
…n't consider case-sensitivity for properties

### What changes were proposed in this pull request?

This PR fixes an issue that `BasicConnectionProvider` doesn't consider case-sensitivity for properties.
For example, the property `oracle.jdbc.mapDateToTimestamp` should be considered case-sensitivity but it is not considered.

### Why are the changes needed?

This is a bug introduced by #29024 .
Caused by this issue, `OracleIntegrationSuite` doesn't pass.

```
[info] - SPARK-16625: General data types to be mapped to Oracle *** FAILED *** (32 seconds, 129 milliseconds)
[info]   types.apply(9).equals(org.apache.spark.sql.types.DateType) was false (OracleIntegrationSuite.scala:238)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.jdbc.OracleIntegrationSuite.$anonfun$new$4(OracleIntegrationSuite.scala:238)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:176)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:61)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:61)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
[info]   at org.scalatest.Suite.run(Suite.scala:1112)
[info]   at org.scalatest.Suite.run$(Suite.scala:1094)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:61)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:61)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

With this change, I confirmed that `OracleIntegrationSuite` passes with the following command.
```
$ git clone https://github.com/oracle/docker-images.git
$ cd docker-images/OracleDatabase/SingleInstance/dockerfiles
$ ./buildDockerImage.sh -v 18.4.0 -x
$ ORACLE_DOCKER_IMAGE_NAME=oracle/database:18.4.0-xe build/sbt  -Pdocker-integration-tests -Phive -Phive-thriftserver "testOnly org.apache.spark.sql.jdbc.OracleIntegrationSuite"
```

Closes #30485 from sarutak/fix-oracle-integration-suite.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
require(filteredProviders.size == 1,
"JDBC connection initiated but not exactly one connection provider found which can handle " +
s"it. Found active providers: ${filteredProviders.mkString(", ")}")
SecurityConfigurationLock.synchronized {
Copy link

@tdg5 tdg5 Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😭

I believe this synchronization introduces a significant performance bottleneck for applications that rely on being able to establish many connections simultaneously for performance reasons. This change forces concurrency to 1 when establishing database connections for a given JDBC driver and that strikes me as a significant user impacting change.

Can anyone propose a workaround for this? I have an app that makes connections to thousands of databases and I can't upgrade to any version >=3.1.x because of this significant bottleneck.

https://issues.apache.org/jira/browse/SPARK-37391

so-much-blocking

That screenshot only shows a handful of blocked threads, but in total I have 98 threads blocked waiting for the SecurityConfigurationLock.

I do not have this issue when running Spark 3.0.1 with no code changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi is getConnection() not thread-safe?
Or is there any possibility of finer-grained locking on the actual object that getConnection() is called on?
We could move the filteredProviders.head out of the block but doubt that does anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is getConnection() not thread-safe?

Simply no.

This is well thought initially even if it introduces bottlenecks. The main problem is that JVM has a single security context. In order to make a connection one MUST modify the security context, otherwise the JDBC connector is not able to provide the security credentials (TGT, keytab or whatever). Simply saying JDBC connectors are able to get credentials from the single security context.

Since multiple connections are made concurrently (sometimes same DB type w/ different credentials) this must be synchronized not to have a malformed security context (we've made load test and added 10+ DBs and corrupted the security context pretty fast w/o sync and it was horror to find out what's going on).
Workload are coming and going, nobody can tell in advance what kind of security context need to be created at the very beginning of JVM creation. If we would pre-bake the security context (not sure how?!) then the following issues would come:

  • How to handle 2 same type of databases (for example MariaDB) with 2 different credentials?
  • How to handle if a DB login credentials changed over time?

I personally think the first one can't be solved, the second one could be cured w/ all JVM restarts but I think it's just not user friendly.

If somebody has an excellent idea I would like to hear it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I can imagine is to pre-set the JVM JAAS config for databases from file like this: java -Djava.security.auth.login.config=jaas.conf.
And then a new flag can be introduced like: spark.jdbc.doNotSyncronize which is default true.
That case security credentials MUSTN'T be provided by Spark's JDBC properties but only from JAAS file.
However this would be super risky and for advanced users only. That said I've spent at least a month to debug the JVM security context what's going on...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know anything about the underlying problem, but it had crossed my mind that allowing the synchronization to be optional could be one path forward.

@gaborgsomogyi may I trouble you for some references related to

The main problem is that JVM has a single security context. In order to make a connection one MUST modify the security context, otherwise the JDBC connector is not able to provide the security credentials (TGT, keytab or whatever)

so I can better familiarize myself with the problem that you are describing?

Beyond this particular issue, what you've shared suggests that the concurrency utilized by my app could be causing us to crosswire data which would be a major problem.

I guess I'd also ask, is there more to it than you described? It sounds like I should either have some cross wired data or if that's not the case then there is some missing piece of the puzzle that means the synchronization is not always required.

Thanks in advance!

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless somebody has a ground breaking idea providers must be synced where security enabled, we can free up some time when no authentication is in place.

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdg5 or @maropu would you be able to find some time for fixing this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed solution seems straightforward to me and I'm happy to take a stab at it. Dumb question though, how would I eventually tweak the spark class loader so it would detect a custom JdbcConnectionProvider in my library code?

I tried baking such a class into the uberjar containing the driver app, but I didn't seem to be able to get ConnectionProvider to notice my clone of BasicConnectionProvider. I was guessing this was due to a class loader disagreement.

My spark-submit looks roughly like so:

spark-submit \
  --class theThing \
  --master 'local[*]' \
  --driver-memory 2G \
  --driver-class-path theUberjar.jar
  theUberjar.jar

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdg5 Thanks for lending a helping hand! For such scenarios I've created a readme here.
Please don't forget to update this doc accordingly.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi thanks! The META-INF manifest is what I was missing 👍🏻

Here's my stab at what you suggested: #34745

I marked it a WIP, but it is complete as far as I am concerned and aware. I just want to make sure it matches up with what you had in mind before I open it up to broader review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants