-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20434][YARN][CORE] Move Hadoop delegation token code from yarn to core #17723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 39 commits
ce63a9b
75d849a
35002f2
13981c8
af4a3e4
5cc66dc
a47c9c0
c8ec049
954eeff
2d76928
d8a968d
b8093c8
25d5088
4c387eb
e32afee
be69f5a
55616da
240df31
810c6b2
ad4e33b
e15f1ab
a546aab
d6d21d1
092aac7
38adaae
92ac3f0
cd58b6c
bf758e6
e820b09
7f4ca86
cda3538
376dba0
0ffe8f0
7796e14
1479c60
4d57f7b
7e2f90d
563b80a
c684d88
c4149dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy.security | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.FileSystem | ||
| import org.apache.hadoop.security.Credentials | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.internal.Logging | ||
|
|
||
| /** | ||
| * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to | ||
| * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]], | ||
| * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not | ||
| * explicitly disabled. | ||
| * | ||
| * Also, each HadoopDelegationTokenProvider is controlled by | ||
| * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to | ||
| * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be | ||
| * enabled/disabled by the configuration spark.security.credentials.hive.enabled. | ||
| * | ||
| * @param sparkConf Spark configuration | ||
| * @param hadoopConf Hadoop configuration | ||
| * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. | ||
| */ | ||
| private[spark] class HadoopDelegationTokenManager( | ||
| sparkConf: SparkConf, | ||
| hadoopConf: Configuration, | ||
| fileSystems: Set[FileSystem]) | ||
| extends Logging { | ||
|
|
||
| private val deprecatedProviderEnabledConfigs = List( | ||
| "spark.yarn.security.tokens.%s.enabled", | ||
| "spark.yarn.security.credentials.%s.enabled") | ||
| private val providerEnabledConfig = "spark.security.credentials.%s.enabled" | ||
|
|
||
| // Maintain all the registered delegation token providers | ||
| private val delegationTokenProviders = getDelegationTokenProviders | ||
| logDebug(s"Using the following delegation token providers: " + | ||
| s"${delegationTokenProviders.keys.mkString(", ")}.") | ||
|
|
||
| private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { | ||
| val providers = List(new HadoopFSDelegationTokenProvider(fileSystems), | ||
| new HiveDelegationTokenProvider, | ||
| new HBaseDelegationTokenProvider) | ||
|
|
||
| // Filter out providers for which spark.security.credentials.{service}.enabled is false. | ||
| providers | ||
| .filter { p => isServiceEnabled(p.serviceName) } | ||
| .map { p => (p.serviceName, p) } | ||
| .toMap | ||
| } | ||
|
|
||
| def isServiceEnabled(serviceName: String): Boolean = { | ||
| val key = providerEnabledConfig.format(serviceName) | ||
|
|
||
| deprecatedProviderEnabledConfigs.foreach { pattern => | ||
| val deprecatedKey = pattern.format(serviceName) | ||
| if (sparkConf.contains(deprecatedKey)) { | ||
| logWarning(s"${deprecatedKey} is deprecated. Please use ${key} instead.") | ||
| } | ||
| } | ||
|
|
||
| val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern => | ||
| sparkConf | ||
| .getOption(pattern.format(serviceName)) | ||
| .map(_.toBoolean) | ||
| .getOrElse(true) | ||
| } | ||
|
|
||
| sparkConf | ||
| .getOption(key) | ||
| .map(_.toBoolean) | ||
| .getOrElse(isEnabledDeprecated) | ||
| } | ||
|
|
||
| /** | ||
| * Get delegation token provider for the specified service. | ||
| */ | ||
| def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = { | ||
| delegationTokenProviders.get(service) | ||
| } | ||
|
|
||
| /** | ||
| * Writes delegation tokens to creds. Delegation tokens are fetched from all registered | ||
| * providers. | ||
| * | ||
| * @return Time after which the fetched delegation tokens should be renewed. | ||
| */ | ||
| def obtainDelegationTokens( | ||
| hadoopConf: Configuration, | ||
| creds: Credentials): Long = { | ||
| delegationTokenProviders.values.flatMap { provider => | ||
| if (provider.delegationTokensRequired(hadoopConf)) { | ||
| provider.obtainDelegationTokens(hadoopConf, creds) | ||
| } else { | ||
| logDebug(s"Service ${provider.serviceName} does not require a token." + | ||
| s" Check your configuration to see if security is disabled or not.") | ||
| None | ||
| } | ||
| }.foldLeft(Long.MaxValue)(math.min) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy.security | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.security.Credentials | ||
|
|
||
| /** | ||
| * Hadoop delegation token provider. | ||
| */ | ||
| private[spark] trait HadoopDelegationTokenProvider { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that this is private, I've reverted
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to update the PR description since it mentions deprecating the YARN interface still.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
|
||
| /** | ||
| * Name of the service to provide delegation tokens. This name should be unique. Spark will | ||
| * internally use this name to differentiate delegation token providers. | ||
| */ | ||
| def serviceName: String | ||
|
|
||
| /** | ||
| * Returns true if delegation tokens are required for this service. By default, it is based on | ||
| * whether Hadoop security is enabled. | ||
| */ | ||
| def delegationTokensRequired(hadoopConf: Configuration): Boolean | ||
|
|
||
| /** | ||
| * Obtain delegation tokens for this service and get the time of the next renewal. | ||
| * @param hadoopConf Configuration of current Hadoop Compatible system. | ||
| * @param creds Credentials to add tokens and security keys to. | ||
| * @return If the returned tokens are renewable and can be renewed, return the time of the next | ||
| * renewal, otherwise None should be returned. | ||
| */ | ||
| def obtainDelegationTokens( | ||
| hadoopConf: Configuration, | ||
| creds: Credentials): Option[Long] | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy.security | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.util.Try | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.FileSystem | ||
| import org.apache.hadoop.mapred.Master | ||
| import org.apache.hadoop.security.{Credentials, UserGroupInformation} | ||
| import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.internal.Logging | ||
|
|
||
| private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSystem]) | ||
| extends HadoopDelegationTokenProvider with Logging { | ||
|
|
||
| // This tokenRenewalInterval will be set in the first call to obtainDelegationTokens. | ||
| // If None, no token renewer is specified or no token can be renewed, | ||
| // so we cannot get the token renewal interval. | ||
| private var tokenRenewalInterval: Option[Long] = null | ||
|
|
||
| override val serviceName: String = "hadoopfs" | ||
|
|
||
| override def obtainDelegationTokens( | ||
| hadoopConf: Configuration, | ||
| creds: Credentials): Option[Long] = { | ||
|
|
||
| val newCreds = fetchDelegationTokens( | ||
| getTokenRenewer(hadoopConf), | ||
| fileSystems) | ||
|
|
||
| // Get the token renewal interval if it is not set. It will only be called once. | ||
| if (tokenRenewalInterval == null) { | ||
| tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems) | ||
| } | ||
|
|
||
| // Get the time of next renewal. | ||
| val nextRenewalDate = tokenRenewalInterval.flatMap { interval => | ||
| val nextRenewalDates = newCreds.getAllTokens.asScala | ||
| .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) | ||
| .map { token => | ||
| val identifier = token | ||
| .decodeIdentifier() | ||
| .asInstanceOf[AbstractDelegationTokenIdentifier] | ||
| identifier.getIssueDate + interval | ||
| } | ||
| if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) | ||
| } | ||
|
|
||
| creds.addAll(newCreds) | ||
| nextRenewalDate | ||
| } | ||
|
|
||
| def delegationTokensRequired(hadoopConf: Configuration): Boolean = { | ||
| UserGroupInformation.isSecurityEnabled | ||
| } | ||
|
|
||
| private def getTokenRenewer(hadoopConf: Configuration): String = { | ||
| val tokenRenewer = Master.getMasterPrincipal(hadoopConf) | ||
| logDebug("Delegation token renewer is: " + tokenRenewer) | ||
|
|
||
| if (tokenRenewer == null || tokenRenewer.length() == 0) { | ||
| val errorMessage = "Can't get Master Kerberos principal for use as renewer." | ||
| logError(errorMessage) | ||
| throw new SparkException(errorMessage) | ||
| } | ||
|
|
||
| tokenRenewer | ||
| } | ||
|
|
||
| private def fetchDelegationTokens( | ||
| renewer: String, | ||
| filesystems: Set[FileSystem]): Credentials = { | ||
|
|
||
| val creds = new Credentials() | ||
|
|
||
| filesystems.foreach { fs => | ||
| logInfo("getting token for: " + fs) | ||
| fs.addDelegationTokens(renewer, creds) | ||
| } | ||
|
|
||
| creds | ||
| } | ||
|
|
||
| private def getTokenRenewalInterval( | ||
| hadoopConf: Configuration, | ||
| filesystems: Set[FileSystem]): Option[Long] = { | ||
| // We cannot use the tokens generated with renewer yarn. Trying to renew | ||
| // those will fail with an access control issue. So create new tokens with the logged in | ||
| // user as renewer. | ||
| val creds = fetchDelegationTokens( | ||
| UserGroupInformation.getCurrentUser.getUserName, | ||
| filesystems) | ||
|
|
||
| val renewIntervals = creds.getAllTokens.asScala.filter { | ||
| _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] | ||
| }.flatMap { token => | ||
| Try { | ||
| val newExpiration = token.renew(hadoopConf) | ||
| val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] | ||
| val interval = newExpiration - identifier.getIssueDate | ||
| logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") | ||
| interval | ||
| }.toOption | ||
| } | ||
| if (renewIntervals.isEmpty) None else Some(renewIntervals.min) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't know how to place these in the
testscope, which is where they belong. See my comment here: https://github.com/apache/spark/pull/17665/files#r112337820There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I tried a few things, and the one that got me further was just having this:
And nix the others. Adding the others in test scope caused some weird error in sbt, even with all dependencies (we have the dependencies you had problems with cached locally).
My comment was going to be to add that, then rewrite the code to use the metastore API instead of the
Hiveclass from hive-exec... but then I noticed that test is not doing much, because there are no metastore servers to talk to. It's even there, hardcoded in the test:All it seems to be doing is making sure the reflection-based code is not completely broken. That is something already, though.
So I have two suggestions, in order of preference:
add the dependencies in "provided" scope, and change the code to use actual types and not reflection. Because the classes may not exist at runtime, that means having to handle
NoClassDefFoundErrorin creative ways.keep the reflection code, and remove this test. Or maybe move it to a separate module as others have suggested.
I kinda like the first because it's always good to avoid reflection, and this is a particularly ugly use of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into it.
Do you know why reflection was used in the first place? Why not just add the Hive dependencies to compile scope? I'm thinking that's what we should do now, and drop reflection.
So I'm agreeing with your first bullet point, but proposing that we add the hive deps to
compilerather thanprovided.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because technically Hive is an optional dependency for Spark, and moving it to compile scope would break that.
(Whether that should change or not is a separate discussion, but probably better not to have it as part of this change.)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright I added hive-exec to provided scope, and removed the reflection.