From 1d928512e29e296d0f09335154976cb8ef44c348 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 14 Jul 2017 10:15:39 -0700 Subject: [PATCH 1/4] Add jars specified in --jars/--packages in AM classpath Change-Id: Ia5aba0bbfba4540340a8e9efa8e90512eaa502c5 --- .../spark/deploy/yarn/ApplicationMaster.scala | 91 ++++++++++++++----- 1 file changed, 66 insertions(+), 25 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4868180569778..59861394fe43a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -90,6 +90,23 @@ private[spark] class ApplicationMaster( @volatile private var reporterThread: Thread = _ @volatile private var allocator: YarnAllocator = _ + private val userClassLoader = { + val classpath = Client.getUserClasspath(sparkConf) + val urls = classpath.map { entry => + new URL("file:" + new File(entry.getPath()).getAbsolutePath()) + } + + if (isClusterMode) { + if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { + new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } else { + new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } + } else { + new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } + } + // Lock for controlling the allocator (heartbeat) thread. private val allocatorLock = new Object() @@ -240,20 +257,6 @@ private[spark] class ApplicationMaster( // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) - // If the credentials file config is present, we must periodically renew tokens. So create - // a new AMDelegationTokenRenewer - if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { - // If a principal and keytab have been set, use that to create new credentials for executors - // periodically - val credentialManager = new YARNHadoopDelegationTokenManager( - sparkConf, - yarnConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) - - val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) - credentialRenewer.scheduleLoginFromKeytab() - } - if (isClusterMode) { runDriver(securityMgr) } else { @@ -438,6 +441,24 @@ private[spark] class ApplicationMaster( registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"), securityMgr) + // If the credentials file config is present, we must periodically renew tokens. So create + // a new AMDelegationTokenRenewer + if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { + // Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the + // classloader so that main jar and secondary jars could be used by AMCredentialRenewer. + val credentialRenewerThread = new Thread { + setName("AMCredentialRenewerThread") + setContextClassLoader(userClassLoader) + + override def run(): Unit = { + startAMCredentialRenewer() + } + } + + credentialRenewerThread.start() + credentialRenewerThread.join() + } + // In client mode the actor will stop the reporter thread. reporterThread.join() } @@ -609,17 +630,6 @@ private[spark] class ApplicationMaster( private def startUserApplication(): Thread = { logInfo("Starting the user application in a separate Thread") - val classpath = Client.getUserClasspath(sparkConf) - val urls = classpath.map { entry => - new URL("file:" + new File(entry.getPath()).getAbsolutePath()) - } - val userClassLoader = - if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { - new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) - } else { - new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) - } - var userArgs = args.userArgs if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { // When running pyspark, the app is run using PythonRunner. The second argument is the list @@ -635,6 +645,11 @@ private[spark] class ApplicationMaster( val userThread = new Thread { override def run() { try { + // If the credentials file config is present, we must periodically renew tokens. So create + // a new AMDelegationTokenRenewer + if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { + startAMCredentialRenewer() + } mainMethod.invoke(null, userArgs.toArray) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) logDebug("Done running users class") @@ -674,6 +689,32 @@ private[spark] class ApplicationMaster( allocatorLock.notifyAll() } + private def startAMCredentialRenewer(): Unit = { + // If a principal and keytab have been set, use that to create new credentials for executors + // periodically + val credentialManager = new YARNHadoopDelegationTokenManager( + sparkConf, + yarnConf, + YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) + + val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) + credentialRenewer.scheduleLoginFromKeytab() + } + + private def startAMCredentialRenewerThread(): Thread = { + val thread = new Thread { + override def run(): Unit = { + startAMCredentialRenewer() + } + } + + thread.setDaemon(true) + thread.setName("AMCredentialRenewerThread") + thread.setContextClassLoader(userClassLoader) + thread.start() + thread + } + /** * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. */ From 2abb207c69f6dbae26047388b06c875529fb557d Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 14 Jul 2017 14:21:34 -0700 Subject: [PATCH 2/4] Remove unncessary code Change-Id: Ife35388c8f8185dc7b083c1a7ce7aff1e280e627 --- .../spark/deploy/yarn/ApplicationMaster.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 59861394fe43a..a0adc961ecbb1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -701,20 +701,6 @@ private[spark] class ApplicationMaster( credentialRenewer.scheduleLoginFromKeytab() } - private def startAMCredentialRenewerThread(): Thread = { - val thread = new Thread { - override def run(): Unit = { - startAMCredentialRenewer() - } - } - - thread.setDaemon(true) - thread.setName("AMCredentialRenewerThread") - thread.setContextClassLoader(userClassLoader) - thread.start() - thread - } - /** * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. */ From 189cc770663a6709eae5ac53264c195a6e13a263 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 14 Jul 2017 15:26:33 -0700 Subject: [PATCH 3/4] Address the comments Change-Id: I0ed8a9b639acaa9d61cca5d6420ad65c16e5a2e1 --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a0adc961ecbb1..766d28632c4f8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -443,11 +443,11 @@ private[spark] class ApplicationMaster( // If the credentials file config is present, we must periodically renew tokens. So create // a new AMDelegationTokenRenewer - if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { + if (sparkConf.contains(CREDENTIALS_FILE_PATH)) { // Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the // classloader so that main jar and secondary jars could be used by AMCredentialRenewer. val credentialRenewerThread = new Thread { - setName("AMCredentialRenewerThread") + setName("AMCredentialRenewerStarter") setContextClassLoader(userClassLoader) override def run(): Unit = { @@ -647,7 +647,7 @@ private[spark] class ApplicationMaster( try { // If the credentials file config is present, we must periodically renew tokens. So create // a new AMDelegationTokenRenewer - if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { + if (sparkConf.contains(CREDENTIALS_FILE_PATH)) { startAMCredentialRenewer() } mainMethod.invoke(null, userArgs.toArray) From 7dbc7269713dd4eff29bbdbf27e89152d9ee78e3 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 17 Jul 2017 11:31:00 -0700 Subject: [PATCH 4/4] refactor the code Change-Id: Iea5e1eb1f61bcd1ad063ac2c9958950858879756 --- .../spark/deploy/yarn/ApplicationMaster.scala | 60 ++++++++----------- 1 file changed, 25 insertions(+), 35 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 766d28632c4f8..ce290c399d9f2 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -257,6 +257,31 @@ private[spark] class ApplicationMaster( // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) + // If the credentials file config is present, we must periodically renew tokens. So create + // a new AMDelegationTokenRenewer + if (sparkConf.contains(CREDENTIALS_FILE_PATH)) { + // Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the + // classloader so that main jar and secondary jars could be used by AMCredentialRenewer. + val credentialRenewerThread = new Thread { + setName("AMCredentialRenewerStarter") + setContextClassLoader(userClassLoader) + + override def run(): Unit = { + val credentialManager = new YARNHadoopDelegationTokenManager( + sparkConf, + yarnConf, + YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) + + val credentialRenewer = + new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) + credentialRenewer.scheduleLoginFromKeytab() + } + } + + credentialRenewerThread.start() + credentialRenewerThread.join() + } + if (isClusterMode) { runDriver(securityMgr) } else { @@ -441,24 +466,6 @@ private[spark] class ApplicationMaster( registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"), securityMgr) - // If the credentials file config is present, we must periodically renew tokens. So create - // a new AMDelegationTokenRenewer - if (sparkConf.contains(CREDENTIALS_FILE_PATH)) { - // Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the - // classloader so that main jar and secondary jars could be used by AMCredentialRenewer. - val credentialRenewerThread = new Thread { - setName("AMCredentialRenewerStarter") - setContextClassLoader(userClassLoader) - - override def run(): Unit = { - startAMCredentialRenewer() - } - } - - credentialRenewerThread.start() - credentialRenewerThread.join() - } - // In client mode the actor will stop the reporter thread. reporterThread.join() } @@ -645,11 +652,6 @@ private[spark] class ApplicationMaster( val userThread = new Thread { override def run() { try { - // If the credentials file config is present, we must periodically renew tokens. So create - // a new AMDelegationTokenRenewer - if (sparkConf.contains(CREDENTIALS_FILE_PATH)) { - startAMCredentialRenewer() - } mainMethod.invoke(null, userArgs.toArray) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) logDebug("Done running users class") @@ -689,18 +691,6 @@ private[spark] class ApplicationMaster( allocatorLock.notifyAll() } - private def startAMCredentialRenewer(): Unit = { - // If a principal and keytab have been set, use that to create new credentials for executors - // periodically - val credentialManager = new YARNHadoopDelegationTokenManager( - sparkConf, - yarnConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) - - val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) - credentialRenewer.scheduleLoginFromKeytab() - } - /** * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. */