-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26794][SQL]SparkSession enableHiveSupport does not point to hive but in-memory while the SparkContext exists #23709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
275e998
e1eafe7
4a1a18b
62d2aa3
8004c19
10ebff7
f5c15e0
d57226a
0c07ff1
35b3f17
0cac5e6
92fec4d
391e14c
eff569b
748821c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,8 +39,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} | |
|
|
||
| /** | ||
| * A class that holds all state shared across sessions in a given [[SQLContext]]. | ||
| * | ||
| * @param sparkContext The Spark context associated with this SharedState | ||
| * @param initialConfigs The configs from the very first created SparkSession | ||
| */ | ||
| private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { | ||
| private[sql] class SharedState( | ||
| val sparkContext: SparkContext, | ||
| initialConfigs: scala.collection.Map[String, String]) | ||
yaooqinn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| extends Logging { | ||
|
|
||
| // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on | ||
| // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. | ||
|
|
@@ -77,6 +83,27 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { | |
| } | ||
| logInfo(s"Warehouse path is '$warehousePath'.") | ||
|
|
||
| // This variable should be initiated after `warehousePath`, because in the first place we need | ||
|
||
| // to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into | ||
| // both spark conf and hadoop conf avoiding be affected by any SparkSession level options | ||
| private val (conf, hadoopConf) = { | ||
| val confClone = sparkContext.conf.clone() | ||
| val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) | ||
| // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing | ||
| // `SharedState`, all `SparkSession` level configurations have higher priority to generate a | ||
| // `SharedState` instance. This will be done only once then shared across `SparkSession`s | ||
| initialConfigs.foreach { | ||
| case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key => | ||
| logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " + | ||
| s"in SparkSession's options, it should be set statically for cross-session usages") | ||
| case (k, v) => | ||
| logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") | ||
| confClone.set(k, v) | ||
| hadoopConfClone.set(k, v) | ||
|
|
||
| } | ||
| (confClone, hadoopConfClone) | ||
| } | ||
|
|
||
yaooqinn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /** | ||
| * Class for caching query results reused in future executions. | ||
|
|
@@ -89,7 +116,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { | |
| */ | ||
| val statusStore: SQLAppStatusStore = { | ||
| val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] | ||
| val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true) | ||
| val listener = new SQLAppStatusListener(conf, kvStore, live = true) | ||
| sparkContext.listenerBus.addToStatusQueue(listener) | ||
| val statusStore = new SQLAppStatusStore(kvStore, Some(listener)) | ||
| sparkContext.ui.foreach(new SQLTab(statusStore, _)) | ||
|
|
@@ -101,9 +128,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { | |
| */ | ||
| lazy val externalCatalog: ExternalCatalogWithListener = { | ||
| val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( | ||
| SharedState.externalCatalogClassName(sparkContext.conf), | ||
| sparkContext.conf, | ||
| sparkContext.hadoopConfiguration) | ||
| SharedState.externalCatalogClassName(conf), conf, hadoopConf) | ||
|
|
||
| val defaultDbDefinition = CatalogDatabase( | ||
| SessionCatalog.DEFAULT_DATABASE, | ||
|
|
@@ -137,7 +162,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { | |
| // System preserved database should not exists in metastore. However it's hard to guarantee it | ||
| // for every session, because case-sensitivity differs. Here we always lowercase it to make our | ||
| // life easier. | ||
| val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) | ||
| val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) | ||
| if (externalCatalog.databaseExists(globalTempDB)) { | ||
| throw new SparkException( | ||
| s"$globalTempDB is a system preserved database, please rename your existing database " + | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.hive | ||
|
|
||
| import org.apache.hadoop.hive.conf.HiveConf.ConfVars | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.internal.SharedState | ||
| import org.apache.spark.sql.internal.StaticSQLConf._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| class HiveSharedStateSuite extends SparkFunSuite { | ||
|
|
||
| test("the catalog should be determined at the very first") { | ||
| val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") | ||
| val sc = SparkContext.getOrCreate(conf) | ||
| val ss = SparkSession.builder().enableHiveSupport().getOrCreate() | ||
| assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName | ||
| .contains("HiveExternalCatalog"), "The catalog should be hive ") | ||
|
|
||
| val ss2 = SparkSession.builder().getOrCreate() | ||
| assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName | ||
|
||
| .contains("HiveExternalCatalog"), "The catalog should be shared across sessions") | ||
|
|
||
| } | ||
|
|
||
| test("using initial configs to generate SharedState") { | ||
|
||
| val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") | ||
| val sc = SparkContext.getOrCreate(conf) | ||
| val invalidPath = "invalid/path" | ||
| val metastorePath = Utils.createTempDir() | ||
| val tmpDb = "tmp_db" | ||
| val initialConfigs = Map("spark.foo" -> "bar", | ||
| WAREHOUSE_PATH.key -> invalidPath, | ||
| ConfVars.METASTOREWAREHOUSE.varname -> invalidPath, | ||
| CATALOG_IMPLEMENTATION.key -> "hive", | ||
| ConfVars.METASTORECONNECTURLKEY.varname -> | ||
| s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true", | ||
| GLOBAL_TEMP_DATABASE.key -> tmpDb) | ||
| val state = new SharedState(sc, initialConfigs) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we be more real-world and create SparkSession here?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In hive module test, there is an long cached TestHiveSession, so I use this SharedState explicitly
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can we build SparkSession in the above test case?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Individually the above test runs correctly, but will use a existing SparkSession with hive support while runs under the whole hive module. so I add the second test to insure all configurations passed correctly.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we add
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. without this fix ,test 1 fails individually but pass fully
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to move test 1 to sql core module, we can catch the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IllegalArgumentException does not mean that the catalog is hive or not. It will also pass anyway with or without this fix,and the fact is that it doesn't have the chance to get to where the catalog is initialized.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then let's just remove test 1, but post the code in PR description to let people know how to reproduce the bug.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. both done. |
||
| assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options") | ||
| assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath, | ||
| "warehouse conf in session options can't affect application wide spark conf") | ||
| assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath, | ||
| "warehouse conf in session options can't affect application wide hadoop conf") | ||
|
|
||
| assert(!state.sparkContext.conf.contains("spark.foo"), | ||
| "static spark conf should not be affected by session") | ||
| assert(state.globalTempViewManager.database === tmpDb) | ||
| assert(state.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], | ||
| "Initial SparkSession options can determine the catalog") | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.