Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SparkSession private(
@Unstable
@transient
lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sparkContext))
existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions.toMap))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,41 @@ import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.{MutableURLClassLoader, Utils}


/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*/
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
private[sql] class SharedState(val sparkContext: SparkContext, initConfig: Map[String, String])
extends Logging {
private val conf = sparkContext.conf.clone()
private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration)

// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
initConfig.foreach { case (k, v) =>
logDebug(s"Applying initiate SparkSession options to SparkConf/HadoopConf: $k -> $v")
conf.set(k, v)
hadoopConf.set(k, v)
}
logInfo("Applied all initiate SparkSession options to the brand new SharedState")

// Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
// the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
val warehousePath: String = {
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
logInfo(s"loading hive config file: $configFile")
sparkContext.hadoopConfiguration.addResource(configFile)
hadoopConf.addResource(configFile)
}

// hive.metastore.warehouse.dir only stay in hadoopConf
sparkContext.conf.remove("hive.metastore.warehouse.dir")
conf.remove("hive.metastore.warehouse.dir")
// Set the Hive metastore warehouse path to the one we use
val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !conf.contains(WAREHOUSE_PATH.key)) {
// If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
// we will respect the value of hive.metastore.warehouse.dir.
sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
Expand All @@ -68,16 +80,15 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// the value of spark.sql.warehouse.dir.
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
val sparkWarehouseDir = conf.get(WAREHOUSE_PATH)
logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " +
s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
hadoopConf.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
sparkWarehouseDir
}
}
logInfo(s"Warehouse path is '$warehousePath'.")


/**
* Class for caching query results reused in future executions.
*/
Expand All @@ -101,9 +112,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
lazy val externalCatalog: ExternalCatalogWithListener = {
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)
SharedState.externalCatalogClassName(conf), conf, hadoopConf)

val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
Expand Down Expand Up @@ -137,7 +146,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// System preserved database should not exists in metastore. However it's hard to guarantee it
// for every session, because case-sensitivity differs. Here we always lowercase it to make our
// life easier.
val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
if (externalCatalog.databaseExists(globalTempDB)) {
throw new SparkException(
s"$globalTempDB is a system preserved database, please rename your existing database " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog(
private[hive] class TestHiveSharedState(
sc: SparkContext,
hiveClient: Option[HiveClient] = None)
extends SharedState(sc) {
extends SharedState(sc, Map.empty) {

override lazy val externalCatalog: ExternalCatalogWithListener = {
new ExternalCatalogWithListener(new TestHiveExternalCatalog(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.SparkSession

class SharedStateSuite extends SparkFunSuite {

test("the catalog should be determined at the very first") {
val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
val sc = new SparkContext(conf)
val ss = SparkSession.builder().enableHiveSupport().getOrCreate()
assert(ss.sharedState.externalCatalog.unwrapped.getClass.getName ===
"org.apache.spark.sql.hive.HiveExternalCatalog", "The catalog should be hive ")

val ss2 = SparkSession.builder().getOrCreate()
assert(ss2.sharedState.externalCatalog.unwrapped.getClass.getName ===
"org.apache.spark.sql.hive.HiveExternalCatalog",
"The catalog should be shared across sessions")
}
}