diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 000000000000..6556f2bb61f3 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1,2 @@ +org.apache.spark.deploy.security.HadoopFSCredentialProvider +org.apache.spark.deploy.security.HBaseCredentialProvider diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f475ce87540a..5f217b3060f6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,26 +17,28 @@ package org.apache.spark.deploy -import java.io.IOException +import java.io.{File, FileOutputStream, IOException} import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal - import com.google.common.primitives.Longs +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier - -import org.apache.spark.{SparkConf, SparkException} +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialUpdater} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkException} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal /** * :: DeveloperApi :: @@ -48,6 +50,9 @@ class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration(sparkConf) UserGroupInformation.setConfiguration(conf) + private var credentialUpdater: CredentialUpdater = _ + + /** * Runs the given function with a Hadoop UserGroupInformation as a thread local variable * (distributed to child threads), used for authenticating HDFS and YARN calls. @@ -290,12 +295,21 @@ class SparkHadoopUtil extends Logging { * Start a thread to periodically update the current user's credentials with new credentials so * that access to secured service does not fail. */ - private[spark] def startCredentialUpdater(conf: SparkConf) {} + private[spark] def startCredentialUpdater(sparkConf: SparkConf): Unit = { + credentialUpdater = + new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater() + credentialUpdater.start() + } /** * Stop the thread that does the credential updates. */ - private[spark] def stopCredentialUpdater() {} + private[spark] def stopCredentialUpdater(): Unit = { + if (credentialUpdater != null) { + credentialUpdater.stop() + credentialUpdater = null + } + } /** * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism. @@ -353,6 +367,17 @@ class SparkHadoopUtil extends Logging { } buffer.toString } + + private[spark] def decodeAndWriteToFile(env: collection.Map[String, String], + key: String, where: File): Unit = { + if (env.contains(key)) { + val creds = new FileOutputStream(where) + val base64 = env.get(key).get + val raw = Base64.decodeBase64(base64) + IOUtils.write(raw, creds) + creds.close() + } + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 77005aa9040b..e8460f3e5e86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -552,7 +552,7 @@ object SparkSubmit extends CommandLineUtils { } // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL) { + if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == STANDALONE) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") if (!new File(args.keytab).exists()) { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 21cb94142b15..dead7867ca1a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -17,24 +17,29 @@ package org.apache.spark.deploy.rest -import java.io.{DataOutputStream, FileNotFoundException} +import java.io.{DataOutputStream, File, FileInputStream, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} import java.nio.charset.StandardCharsets +import java.util.UUID import java.util.concurrent.TimeoutException import javax.servlet.http.HttpServletResponse +import com.fasterxml.jackson.core.JsonProcessingException +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.io +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS +import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkException, SPARK_VERSION => sparkVersion} + import scala.collection.mutable -import scala.concurrent.{Await, Future} import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import scala.io.Source import scala.util.control.NonFatal -import com.fasterxml.jackson.core.JsonProcessingException - -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException} -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - /** * A client that submits applications to a [[RestSubmissionServer]]. * @@ -182,6 +187,38 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { message.appArgs = appArgs message.sparkProperties = sparkProperties message.environmentVariables = environmentVariables + + def uti(fn: DataOutputBuffer => Unit): String = { + val dob = new DataOutputBuffer + fn(dob) + dob.close() + new String(Base64.encodeBase64(dob.getData)) + } + + // Propagate kerberos credentials if necessary + if (sparkProperties.contains(PRINCIPAL.key)) { + val principal = sparkProperties.get(PRINCIPAL.key).get + val keytab = sparkProperties.get(KEYTAB.key).orNull + require(keytab != null, "Keytab must be specified when principal is specified.") + logInfo("Attempting to login to the Kerberos" + + s" using principal: $principal and keytab: $keytab") + val f = new File(keytab) + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + val keytabFileName = f.getName + "-" + UUID.randomUUID().toString + + logInfo("To enable the driver to login from keytab, credentials are are being copied" + + " to the Master inside the CreateSubmissionRequest") + + val keytabContent = Utils.base64EncodedValue { dob => + io.IOUtils.copy(new FileInputStream(f), dob) + } + + message.environmentVariables += BOOTSTRAP_TOKENS -> keytabContent + // overwrite with localized version + message.sparkProperties += KEYTAB.key -> keytabFileName + } + message.validate() message } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/core/src/main/scala/org/apache/spark/deploy/security/AMCredentialRenewer.scala similarity index 97% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala rename to core/src/main/scala/org/apache/spark/deploy/security/AMCredentialRenewer.scala index 7e76f402db24..c422f3550f3c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/AMCredentialRenewer.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.security.PrivilegedExceptionAction import java.util.concurrent.{Executors, TimeUnit} @@ -22,11 +22,8 @@ import java.util.concurrent.{Executors, TimeUnit} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation - import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.ThreadUtils @@ -51,7 +48,7 @@ import org.apache.spark.util.ThreadUtils * appeared, it will read the credentials and update the currently running UGI with it. This * process happens again once 80% of the validity of this has expired. */ -private[yarn] class AMCredentialRenewer( +private[spark] class AMCredentialRenewer( sparkConf: SparkConf, hadoopConf: Configuration, credentialManager: ConfigurableCredentialManager) extends Logging { @@ -62,7 +59,7 @@ private[yarn] class AMCredentialRenewer( Executors.newSingleThreadScheduledExecutor( ThreadUtils.namedThreadFactory("Credential Refresh Thread")) - private val hadoopUtil = YarnSparkHadoopUtil.get + private val hadoopUtil = SparkHadoopUtil.get private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala similarity index 97% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala rename to core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala index 4f4be52a0d69..904584a9a60a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.util.ServiceLoader -import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials - import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils +import scala.collection.JavaConverters._ + /** * A ConfigurableCredentialManager to manage all the registered credential providers and offer * APIs for other modules to obtain credentials as well as renewal time. By default @@ -41,7 +40,7 @@ import org.apache.spark.util.Utils * For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by * the configuration spark.yarn.security.credentials.hive.enabled. */ -private[yarn] final class ConfigurableCredentialManager( +private[spark] final class ConfigurableCredentialManager( sparkConf: SparkConf, hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled" private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled" diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala similarity index 97% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala rename to core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala index 41b7b5d60b03..64febe632a64 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala @@ -15,22 +15,21 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.util.concurrent.{Executors, TimeUnit} -import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.{Credentials, UserGroupInformation} - import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.util.{ThreadUtils, Utils} +import scala.util.control.NonFatal + private[spark] class CredentialUpdater( sparkConf: SparkConf, hadoopConf: Configuration, @@ -38,6 +37,7 @@ private[spark] class CredentialUpdater( @volatile private var lastCredentialsFileSuffix = 0 + // TODO move to ConfigBuilder private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) private val freshHadoopConf = SparkHadoopUtil.get.getConfBypassingFSCache( diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala similarity index 98% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala index 5adeb8e605ff..fa2a1b44dc2e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security - -import scala.reflect.runtime.universe -import scala.util.control.NonFatal +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.{Token, TokenIdentifier} - import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging { override def serviceName: String = "hbase" diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala similarity index 98% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala index f65c886db944..9c048fdb9384 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala @@ -15,21 +15,19 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security - -import scala.collection.JavaConverters._ -import scala.util.Try +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier - -import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.{SparkConf, SparkException} + +import scala.collection.JavaConverters._ +import scala.util.Try private[security] class HadoopFSCredentialProvider extends ServiceCredentialProvider with Logging { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala similarity index 97% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala index 4e3fcce8dbb1..8dfcc4b7ab60 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.{Credentials, UserGroupInformation} - import org.apache.spark.SparkConf /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index e878c10183f6..290a4c5488e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -21,18 +21,19 @@ import java.io._ import java.net.URI import java.nio.charset.StandardCharsets -import scala.collection.JavaConverters._ - import com.google.common.io.Files - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState +import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.KEYTAB +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} +import org.apache.spark.{SecurityManager, SparkConf} + +import scala.collection.JavaConverters._ /** * Manages the execution of one driver, including automatically restarting the driver on failure. @@ -181,6 +182,15 @@ private[deploy] class DriverRunner( val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) + // We only support propagtation of credentials if keytab is passed + // since we can't renew indefinitely without a keytab + if (driverDesc.command.environment.contains(BOOTSTRAP_TOKENS) && conf.contains(KEYTAB)) { + val keytab = conf.get(KEYTAB).get + val keytabFile = new File(driverDir, keytab) + SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment, + BOOTSTRAP_TOKENS, keytabFile) + } + runDriver(builder, driverDir, driverDesc.supervise) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 6799f78ec0c1..f8e8b1551698 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -19,7 +19,9 @@ package org.apache.spark.deploy.worker import java.io.File -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -52,6 +54,22 @@ object DriverWrapper { } Thread.currentThread.setContextClassLoader(loader) + + //////////////////////////////// + val loginFromKeytab = conf.contains(PRINCIPAL.key) + if (loginFromKeytab) { + val principal = conf.get(PRINCIPAL).get + val keytab = conf.get(KEYTAB).orNull + require(keytab != null, "Keytab must be specified when principal is specified") + if (!new File(keytab).exists()) { + throw new SparkException(s"Keytab file: ${keytab} does not exist") + } else { + UserGroupInformation.loginUserFromKeytab(principal, keytab) + } + } + ////////////////////////////// + + // Delegate to supplied main class val clazz = Utils.classForName(mainClass) val mainMethod = clazz.getMethod("main", classOf[Array[String]]) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index d4d8521cc820..7681d5620f8a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -20,17 +20,17 @@ package org.apache.spark.deploy.worker import java.io._ import java.nio.charset.StandardCharsets -import scala.collection.JavaConverters._ - import com.google.common.io.Files - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.deploy.{ApplicationDescription, ExecutorState, SparkHadoopUtil} import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.util.logging.FileAppender +import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS + +import scala.collection.JavaConverters._ /** * Manages the execution of one executor process. @@ -154,6 +154,16 @@ private[deploy] class ExecutorRunner( // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") + // Ensure delegation tokens are propagated if necessary + if (appDesc.command.environment.contains(BOOTSTRAP_TOKENS)) { + val tokenFile = new File(executorDir, "executor-credentials-" + appId) + SparkHadoopUtil.get.decodeAndWriteToFile(appDesc.command.environment, + BOOTSTRAP_TOKENS, tokenFile) + builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString) + + logInfo("Wrote HADOOP_TOKEN_FILE_LOCATION to " + tokenFile) + } + // Add webUI log urls val baseUrl = if (conf.getBoolean("spark.ui.reverseProxy", false)) { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ba0096d87456..422287d33642 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -24,12 +24,12 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal - import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.CREDENTIALS_FILE_PATH import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -213,12 +213,18 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { driverConf.set(key, value) } } - if (driverConf.contains("spark.yarn.credentials.file")) { + + ////////////////////////// + ////////////////////////// only useful if principal/keytab are specified + + if (driverConf.contains(CREDENTIALS_FILE_PATH.key)) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) SparkHadoopUtil.get.startCredentialUpdater(driverConf) } + ////////////////////////////// + val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 89aeea493908..fc89dd8f56eb 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -272,4 +272,45 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime") + .internal() + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(Long.MaxValue) + + private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime") + .internal() + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(Long.MaxValue) + + private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file") + .internal() + .stringConf + .createWithDefault(null) + + private[spark] val CREDENTIAL_FILE_MAX_COUNT = + ConfigBuilder("spark.yarn.credentials.file.retention.count") + .intConf + .createWithDefault(5) + + private[spark] val CREDENTIALS_FILE_MAX_RETENTION = + ConfigBuilder("spark.yarn.credentials.file.retention.days") + .intConf + .createWithDefault(5) + + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") + .doc("Staging directory used while submitting applications.") + .stringConf + .createOptional + + private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") + .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + + "fs.defaultFS does not need to be listed here.") + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") + .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + + "that hosts fs.defaultFS does not need to be listed here.") + .fallbackConf(NAMENODES_TO_ACCESS) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4eedaaea6119..11365f50a208 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -639,4 +639,5 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private[spark] object CoarseGrainedSchedulerBackend { val ENDPOINT_NAME = "CoarseGrainedScheduler" + val BOOTSTRAP_TOKENS = "SPARK_BOOTSTRAP_TOKENS" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 7befdb0c1f64..bf4c42081c16 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -19,16 +19,20 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore -import scala.concurrent.Future - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.deploy.{ApplicationDescription, Command} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} +import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} +import org.apache.spark.deploy.{ApplicationDescription, Command, SparkHadoopUtil} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext} + +import scala.concurrent.Future /** * A [[SchedulerBackend]] implementation for Spark's standalone cluster manager. @@ -55,7 +59,71 @@ private[spark] class StandaloneSchedulerBackend( private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) private val totalExpectedCores = maxCores.getOrElse(0) + + private var loginFromKeytab = false + private var principal: String = null + private var keytab: String = null + private var credentials: Credentials = null + + private var credentialRenewer: AMCredentialRenewer = _ + + + def setupCredentials(): Unit = { + loginFromKeytab = conf.contains(PRINCIPAL.key) + if (loginFromKeytab) { + principal = conf.get(PRINCIPAL).get + keytab = conf.get(KEYTAB).orNull + } + // Defensive copy of the credentials + credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) + } + + override def start() { + + setupCredentials() + + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val credentialManager = new ConfigurableCredentialManager(conf, hadoopConf) + + // Merge credentials obtained from registered providers + val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials) + + if (credentials != null) { + logDebug(SparkHadoopUtil.get.dumpTokens(credentials).mkString("\n")) + } + + // If we use principal and keytab to login, also credentials can be renewed some time + // after current time, we should pass the next renewal and updating time to credential + // renewer and updater. + + if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() && + nearestTimeOfNextRenewal != Long.MaxValue) { + + // Valid renewal time is 75% of next renewal time, and the valid update time will be + // slightly later then renewal time (80% of next renewal time). This is to make sure + // credentials are renewed and updated before expired. + val currTime = System.currentTimeMillis() + val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime + val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime + + logInfo(s"Setting credential renewal time: ${renewalTime.toLong} ms," + + s" update time ${updateTime.toLong} ms") + + conf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong) + conf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong) + } + + // If the credentials file config is present, we must periodically renew tokens. So create + // a new AMDelegationTokenRenewer + if (conf.contains(CREDENTIALS_FILE_PATH.key)) { + // If a principal and keytab have been set, use that to create new credentials for executors + // periodically + credentialRenewer = credentialManager.credentialRenewer() + credentialRenewer.scheduleLoginFromKeytab() + } + // NOTE we don't need an updater since the above accomplishes it already + super.start() launcherBackend.connect() @@ -88,11 +156,23 @@ private[spark] class StandaloneSchedulerBackend( Nil } + // Propagate security tokens to executors + val bootstrap = if (credentials.getAllTokens.size() > 0) { + val bootstrapCredentials = Utils.base64EncodedValue { dob => + credentials.writeTokenStorageToStream(dob) + } + + logInfo("Security tokens will be sent to executors") + Map(BOOTSTRAP_TOKENS -> bootstrapCredentials) + } else Map.empty[String, String] + + val executorEnv = sc.executorEnvs.toMap ++ bootstrap + // Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", - args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) + args, executorEnv, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val webUrl = sc.ui.map(_.webUrl).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) // If we're using dynamic allocation, set our initial executor limit to 0 for now. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 943dde072327..1a1e1b4040c6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -40,20 +40,20 @@ import scala.reflect.ClassTag import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} import scala.util.matching.Regex - import _root_.io.netty.channel.unix.Errors.NativeIoException import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses +import org.apache.commons.codec.binary.Base64 import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ import org.slf4j.Logger - import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -2627,6 +2627,15 @@ private[spark] object Utils extends Logging { redact(redactionPattern, kvs.toArray) } + /** + * Base64 encode the data in a DataOutputBuffer + */ + def base64EncodedValue(fn: DataOutputBuffer => Unit): String = { + val dob = new DataOutputBuffer + fn(dob) + dob.close() + new String(Base64.encodeBase64(dob.getData)) + } } private[util] object CallerContext extends Logging { diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 000000000000..2676a0ad589f --- /dev/null +++ b/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1 @@ +org.apache.spark.deploy.security.TestCredentialProvider diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala similarity index 89% rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala rename to core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala index b0067aa4517c..7804ed8ba7c7 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala @@ -15,16 +15,14 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Text import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.Token -import org.scalatest.{BeforeAndAfter, Matchers} - import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.yarn.config._ +import org.scalatest.{BeforeAndAfter, Matchers} class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { private var credentialManager: ConfigurableCredentialManager = null @@ -50,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) credentialManager.getServiceCredentialProvider("hbase") should not be (None) - credentialManager.getServiceCredentialProvider("hive") should not be (None) + credentialManager.getServiceCredentialProvider("hive") should be (None) } test("disable hive credential provider") { @@ -96,19 +94,6 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal) } - test("obtain tokens For HiveMetastore") { - val hadoopConf = new Configuration() - hadoopConf.set("hive.metastore.kerberos.principal", "bob") - // thrift picks up on port 0 and bails out, without trying to talk to endpoint - hadoopConf.set("hive.metastore.uris", "http://localhost:0") - - val hiveCredentialProvider = new HiveCredentialProvider() - val credentials = new Credentials() - hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials) - - credentials.getAllTokens.size() should be (0) - } - test("Obtain tokens For HBase") { val hadoopConf = new Configuration() hadoopConf.set("hbase.security.authentication", "kerberos") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala similarity index 98% rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala rename to core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala index f50ee193c258..c551498f6d53 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration -import org.scalatest.{Matchers, PrivateMethodTester} - import org.apache.spark.{SparkException, SparkFunSuite} +import org.scalatest.{Matchers, PrivateMethodTester} class HadoopFSCredentialProviderSuite extends SparkFunSuite diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 2355d40d1e6f..4ae5b0b08324 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -102,7 +102,7 @@ spark-deps-.* org.apache.spark.scheduler.ExternalClusterManager .*\.sql .Rbuildignore -org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +org.apache.spark.deploy.security.ServiceCredentialProvider spark-warehouse structured-streaming/* kafka-source-initial-offset-version-2.1.0.bin diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index e9ddaa76a797..f99005845191 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -503,8 +503,9 @@ launch time. This is done by listing them in the `spark.yarn.access.hadoopFileSy spark.yarn.access.hadoopFileSystems hdfs://ireland.example.org:8020/,webhdfs://frankfurt.example.org:50070/ ``` +TODO Spark supports integrating with other security-aware services through Java Services mechanism (see -`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` +`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.security.ServiceCredentialProvider` should be available to Spark by listing their names in the corresponding file in the jar's `META-INF/services` directory. These plug-ins can be disabled by setting `spark.yarn.security.credentials.{service}.enabled` to `false`, where `{service}` is the name of diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider deleted file mode 100644 index f5a807ecac9d..000000000000 --- a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +++ /dev/null @@ -1,3 +0,0 @@ -org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider -org.apache.spark.deploy.yarn.security.HBaseCredentialProvider -org.apache.spark.deploy.yarn.security.HiveCredentialProvider diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 864c834d110f..359813616567 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -37,8 +37,8 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rpc._ diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3218d221143e..a11535490acf 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -48,8 +48,8 @@ import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.ConfigurableCredentialManager import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} @@ -954,6 +954,7 @@ private[spark] class Client( amContainer } + // TODO - doesn't actually login from keytab! That's done in SparkSubmit! def setupCredentials(): Unit = { loginFromKeytab = sparkConf.contains(PRINCIPAL.key) if (loginFromKeytab) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 93578855122c..2cc2d889338e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, CredentialUpdater} +import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialUpdater} import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils @@ -45,8 +45,6 @@ import org.apache.spark.util.Utils */ class YarnSparkHadoopUtil extends SparkHadoopUtil { - private var credentialUpdater: CredentialUpdater = _ - override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { dest.addCredentials(source.getCredentials()) } @@ -86,19 +84,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { if (credentials != null) credentials.getSecretKey(new Text(key)) else null } - private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = { - credentialUpdater = - new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater() - credentialUpdater.start() - } - - private[spark] override def stopCredentialUpdater(): Unit = { - if (credentialUpdater != null) { - credentialUpdater.stop() - credentialUpdater = null - } - } - private[spark] def getContainerId: ContainerId = { val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) ConverterUtils.toContainerId(containerIdString) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index d8c96c35ca71..4ebbddde49a1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -127,11 +127,6 @@ package object config { .intConf .createOptional - private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") - .doc("Staging directory used while submitting applications.") - .stringConf - .createOptional - /* Cluster-mode launcher configuration. */ private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") @@ -236,30 +231,6 @@ package object config { .stringConf .createOptional - /* Security configuration. */ - - private[spark] val CREDENTIAL_FILE_MAX_COUNT = - ConfigBuilder("spark.yarn.credentials.file.retention.count") - .intConf - .createWithDefault(5) - - private[spark] val CREDENTIALS_FILE_MAX_RETENTION = - ConfigBuilder("spark.yarn.credentials.file.retention.days") - .intConf - .createWithDefault(5) - - private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") - .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + - "fs.defaultFS does not need to be listed here.") - .stringConf - .toSequence - .createWithDefault(Nil) - - private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") - .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + - "that hosts fs.defaultFS does not need to be listed here.") - .fallbackConf(NAMENODES_TO_ACCESS) - /* Rolled log aggregation configuration. */ private[spark] val ROLLED_LOG_INCLUDE_PATTERN = @@ -278,11 +249,6 @@ package object config { /* Private configs. */ - private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file") - .internal() - .stringConf - .createWithDefault(null) - // Internal config to propagate the location of the user's jar to the driver/executors private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar") .internal() @@ -335,17 +301,7 @@ package object config { .internal() .stringConf .createOptional - - private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime") - .internal() - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(Long.MaxValue) - - private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime") - .internal() - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(Long.MaxValue) - + // The list of cache-related config entries. This is used by Client and the AM to clean // up the environment so that these settings do not appear on the web UI. private[yarn] val CACHE_CONFIGS = Seq( diff --git a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider deleted file mode 100644 index d0ef5efa36e8..000000000000 --- a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.deploy.yarn.security.TestCredentialProvider diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 000000000000..3c7f53b48bb7 --- /dev/null +++ b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1 @@ +org.apache.spark.sql.hive.security.HiveCredentialProvider diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveCredentialProvider.scala similarity index 96% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveCredentialProvider.scala index 16d8fc32bb42..2493f32c2eac 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveCredentialProvider.scala @@ -15,25 +15,25 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.sql.hive.security import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction -import scala.reflect.runtime.universe -import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.io.Text -import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.Token - +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.security.ServiceCredentialProvider import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[security] class HiveCredentialProvider extends ServiceCredentialProvider with Logging { +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +private[spark] class HiveCredentialProvider extends ServiceCredentialProvider with Logging { override def serviceName: String = "hive" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/security/HiveCredentialProviderSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/security/HiveCredentialProviderSuite.scala new file mode 100644 index 000000000000..121868c78101 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/security/HiveCredentialProviderSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.security + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.spark.deploy.security.ConfigurableCredentialManager +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.scalatest.{BeforeAndAfter, Matchers} + +class HiveCredentialProviderSuite extends SparkFunSuite with Matchers with BeforeAndAfter { + private var sparkConf: SparkConf = null + private var hadoopConf: Configuration = null + + override def beforeAll(): Unit = { + super.beforeAll() + + sparkConf = new SparkConf() + hadoopConf = new Configuration() + + System.setProperty("SPARK_YARN_MODE", "true") + } + + override def afterAll(): Unit = { + System.clearProperty("SPARK_YARN_MODE") + + super.afterAll() + } + + test("Correctly Hive credential provider with other default credential providers") { + val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) + credentialManager.getServiceCredentialProvider("hbase") should not be (None) + credentialManager.getServiceCredentialProvider("hive") should not be (None) + } + + test("obtain tokens For HiveMetastore") { + val hadoopConf = new Configuration() + hadoopConf.set("hive.metastore.kerberos.principal", "bob") + // thrift picks up on port 0 and bails out, without trying to talk to endpoint + hadoopConf.set("hive.metastore.uris", "http://localhost:0") + + val hiveCredentialProvider = new HiveCredentialProvider() + val credentials = new Credentials() + hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials) + + credentials.getAllTokens.size() should be (0) + } +}