-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21428] Turn IsolatedClientLoader off while using builtin Hive jars for reusing CliSessionState #18648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
394b471
62049f5
d127f96
d18bcc0
341964e
6c0bf70
51fac11
c50a32f
c6ed2d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.hive.thriftserver | ||
|
|
||
| import org.apache.hadoop.hive.cli.CliSessionState | ||
| import org.apache.hadoop.hive.conf.HiveConf | ||
| import org.apache.hadoop.hive.ql.session.SessionState | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkFunSuite} | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.sql.hive.HiveUtils | ||
|
|
||
| class HiveCliSessionStateSuite extends SparkFunSuite { | ||
|
|
||
| def withSessionClear(f: () => Unit): Unit = { | ||
| try f finally SessionState.detachSession() | ||
| } | ||
|
|
||
| test("CliSessionState will be reused") { | ||
| withSessionClear { () => | ||
| val hiveConf = new HiveConf(classOf[SessionState]) | ||
| HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { | ||
| case (key, value) => hiveConf.set(key, value) | ||
| } | ||
| val sessionState: SessionState = new CliSessionState(hiveConf) | ||
| SessionState.start(sessionState) | ||
| val s1 = SessionState.get | ||
| val sparkConf = new SparkConf() | ||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) | ||
| val s2 = HiveUtils.newClientForMetadata(sparkConf, hadoopConf).getState | ||
| assert(s1 === s2) | ||
| assert(s2.isInstanceOf[CliSessionState]) | ||
| } | ||
| } | ||
|
|
||
| test("SessionState will not be reused") { | ||
| withSessionClear { () => | ||
| val sparkConf = new SparkConf() | ||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) | ||
| HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { | ||
| case (key, value) => hadoopConf.set(key, value) | ||
| } | ||
| val hiveClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf) | ||
| val s1 = hiveClient.getState | ||
| val s2 = hiveClient.newSession().getState | ||
| assert(s1 !== s2) | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration | |
| import org.apache.hadoop.hive.common.`type`.HiveDecimal | ||
| import org.apache.hadoop.hive.conf.HiveConf | ||
| import org.apache.hadoop.hive.conf.HiveConf.ConfVars | ||
| import org.apache.hadoop.hive.ql.session.SessionState | ||
| import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} | ||
| import org.apache.hadoop.util.VersionInfo | ||
|
|
||
|
|
@@ -229,6 +230,17 @@ private[spark] object HiveUtils extends Logging { | |
| }.toMap | ||
| } | ||
|
|
||
| def isCliSessionState(): Boolean = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Should add comment for this method. |
||
| val state = SessionState.get | ||
| var temp: Class[_] = if (state != null) state.getClass else null | ||
| var found = false | ||
| while (temp != null && !found) { | ||
| found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" | ||
| temp = temp.getSuperclass | ||
| } | ||
| found | ||
| } | ||
|
|
||
| /** | ||
| * Create a [[HiveClient]] used for execution. | ||
| * | ||
|
|
@@ -312,7 +324,7 @@ private[spark] object HiveUtils extends Logging { | |
| hadoopConf = hadoopConf, | ||
| execJars = jars.toSeq, | ||
| config = configurations, | ||
| isolationOn = true, | ||
| isolationOn = !isCliSessionState(), | ||
| barrierPrefixes = hiveMetastoreBarrierPrefixes, | ||
| sharedPrefixes = hiveMetastoreSharedPrefixes) | ||
| } else if (hiveMetastoreJars == "maven") { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,9 +35,9 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab | |
| import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC | ||
| import org.apache.hadoop.hive.ql.processors._ | ||
| import org.apache.hadoop.hive.ql.session.SessionState | ||
| import org.apache.hadoop.security.UserGroupInformation | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkException} | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.metrics.source.HiveCatalogMetrics | ||
| import org.apache.spark.sql.AnalysisException | ||
|
|
@@ -105,107 +105,69 @@ private[hive] class HiveClientImpl( | |
| // Create an internal session state for this HiveClientImpl. | ||
| val state: SessionState = { | ||
| val original = Thread.currentThread().getContextClassLoader | ||
| // Switch to the initClassLoader. | ||
| Thread.currentThread().setContextClassLoader(initClassLoader) | ||
|
|
||
| // Set up kerberos credentials for UserGroupInformation.loginUser within | ||
| // current class loader | ||
| if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { | ||
| val principalName = sparkConf.get("spark.yarn.principal") | ||
| val keytabFileName = sparkConf.get("spark.yarn.keytab") | ||
| if (!new File(keytabFileName).exists()) { | ||
| throw new SparkException(s"Keytab file: ${keytabFileName}" + | ||
| " specified in spark.yarn.keytab does not exist") | ||
| } else { | ||
| logInfo("Attempting to login to Kerberos" + | ||
| s" using principal: ${principalName} and keytab: ${keytabFileName}") | ||
| UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName) | ||
| } | ||
| } | ||
|
|
||
| def isCliSessionState(state: SessionState): Boolean = { | ||
| var temp: Class[_] = if (state != null) state.getClass else null | ||
| var found = false | ||
| while (temp != null && !found) { | ||
| found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" | ||
| temp = temp.getSuperclass | ||
| if (clientLoader.isolationOn) { | ||
| // Switch to the initClassLoader. | ||
| Thread.currentThread().setContextClassLoader(initClassLoader) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the behavior change safe here? Previously, we switch the context ClassLoader for both conditions, while in this PR we only do that if
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when isolation Off, we just switch a classloader to itself
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A user app new an CliSessionState instance with built in hive jars to trigger isolate off, then it detach this state, and then new a hive client again, this time isolate off and |
||
| // Set up kerberos credentials for UserGroupInformation.loginUser within | ||
| // current class loader | ||
| if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { | ||
| val principal = sparkConf.get("spark.yarn.principal") | ||
| val keytab = sparkConf.get("spark.yarn.keytab") | ||
| SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab) | ||
| } | ||
| found | ||
| } | ||
|
|
||
| val ret = try { | ||
| // originState will be created if not exists, will never be null | ||
| val originalState = SessionState.get() | ||
| if (isCliSessionState(originalState)) { | ||
| // In `SparkSQLCLIDriver`, we have already started a `CliSessionState`, | ||
| // which contains information like configurations from command line. Later | ||
| // we call `SparkSQLEnv.init()` there, which would run into this part again. | ||
| // so we should keep `conf` and reuse the existing instance of `CliSessionState`. | ||
| originalState | ||
| } else { | ||
| val hiveConf = new HiveConf(classOf[SessionState]) | ||
| // 1: we set all confs in the hadoopConf to this hiveConf. | ||
| // This hadoopConf contains user settings in Hadoop's core-site.xml file | ||
| // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in | ||
| // SharedState and put settings in this hadoopConf instead of relying on HiveConf | ||
| // to load user settings. Otherwise, HiveConf's initialize method will override | ||
| // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars | ||
| // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath | ||
| // has hive-site.xml. So, HiveConf will use that to override its default values. | ||
| hadoopConf.iterator().asScala.foreach { entry => | ||
| val key = entry.getKey | ||
| val value = entry.getValue | ||
| if (key.toLowerCase(Locale.ROOT).contains("password")) { | ||
| logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=xxx") | ||
| } else { | ||
| logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=$value") | ||
| } | ||
| hiveConf.set(key, value) | ||
| } | ||
| // HiveConf is a Hadoop Configuration, which has a field of classLoader and | ||
| // the initial value will be the current thread's context class loader | ||
| // (i.e. initClassLoader at here). | ||
| // We call initialConf.setClassLoader(initClassLoader) at here to make | ||
| // this action explicit. | ||
| hiveConf.setClassLoader(initClassLoader) | ||
| // 2: we set all spark confs to this hiveConf. | ||
| sparkConf.getAll.foreach { case (k, v) => | ||
| if (k.toLowerCase(Locale.ROOT).contains("password")) { | ||
| logDebug(s"Applying Spark config to Hive Conf: $k=xxx") | ||
| } else { | ||
| logDebug(s"Applying Spark config to Hive Conf: $k=$v") | ||
| } | ||
| hiveConf.set(k, v) | ||
| } | ||
| // 3: we set all entries in config to this hiveConf. | ||
| extraConfig.foreach { case (k, v) => | ||
| if (k.toLowerCase(Locale.ROOT).contains("password")) { | ||
| logDebug(s"Applying extra config to HiveConf: $k=xxx") | ||
| } else { | ||
| logDebug(s"Applying extra config to HiveConf: $k=$v") | ||
| } | ||
| hiveConf.set(k, v) | ||
| } | ||
| val state = new SessionState(hiveConf) | ||
| if (clientLoader.cachedHive != null) { | ||
| Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) | ||
| } | ||
| SessionState.start(state) | ||
| state.out = new PrintStream(outputBuffer, true, "UTF-8") | ||
| state.err = new PrintStream(outputBuffer, true, "UTF-8") | ||
| state | ||
| try { | ||
| newState() | ||
| } finally { | ||
| Thread.currentThread().setContextClassLoader(original) | ||
| } | ||
| } finally { | ||
| Thread.currentThread().setContextClassLoader(original) | ||
| } else { | ||
| Option(SessionState.get()).getOrElse(newState()) | ||
|
||
| } | ||
| ret | ||
| } | ||
|
|
||
| // Log the default warehouse location. | ||
| logInfo( | ||
| s"Warehouse location for Hive client " + | ||
| s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}") | ||
|
|
||
| private def newState(): SessionState = { | ||
| val hiveConf = new HiveConf(classOf[SessionState]) | ||
| // HiveConf is a Hadoop Configuration, which has a field of classLoader and | ||
| // the initial value will be the current thread's context class loader | ||
| // (i.e. initClassLoader at here). | ||
| // We call initialConf.setClassLoader(initClassLoader) at here to make | ||
| // this action explicit. | ||
| hiveConf.setClassLoader(initClassLoader) | ||
|
|
||
| // 1: Take all from the hadoopConf to this hiveConf. | ||
| // This hadoopConf contains user settings in Hadoop's core-site.xml file | ||
| // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in | ||
| // SharedState and put settings in this hadoopConf instead of relying on HiveConf | ||
| // to load user settings. Otherwise, HiveConf's initialize method will override | ||
| // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars | ||
| // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath | ||
| // has hive-site.xml. So, HiveConf will use that to override its default values. | ||
| // 2: we set all spark confs to this hiveConf. | ||
| // 3: we set all entries in config to this hiveConf. | ||
| (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) | ||
| ++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) => | ||
| if (k.toLowerCase(Locale.ROOT).contains("password")) { | ||
| logDebug(s"Applying Spark config to Hive Conf: $k=xxx") | ||
|
||
| } else { | ||
| logDebug(s"Applying Spark config to Hive Conf: $k=$v") | ||
| } | ||
| hiveConf.set(k, v) | ||
| } | ||
| val state = new SessionState(hiveConf) | ||
| if (clientLoader.cachedHive != null) { | ||
| Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) | ||
| } | ||
| SessionState.start(state) | ||
| state.out = new PrintStream(outputBuffer, true, "UTF-8") | ||
| state.err = new PrintStream(outputBuffer, true, "UTF-8") | ||
| state | ||
| } | ||
|
|
||
| /** Returns the configuration for the current session. */ | ||
| def conf: HiveConf = state.getConf | ||
|
|
||
|
|
@@ -269,6 +231,9 @@ private[hive] class HiveClientImpl( | |
| } | ||
| } | ||
|
|
||
| /** Return the associated Hive [[SessionState]] of this [[HiveClientImpl]] */ | ||
| override def getState: SessionState = withHiveState(state) | ||
|
|
||
| /** | ||
| * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive. | ||
| */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: To be general, let's not mention the config name
spark.yarn.keytabhere.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,notice that