Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ private[security] class HBaseDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private[spark] class HadoopDelegationTokenManager(
def obtainDelegationTokens(creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
provider.obtainDelegationTokens(hadoopConf, sparkConf, fileSystemsToAccess(), creds)
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
Expand Down Expand Up @@ -181,14 +181,6 @@ private[spark] class HadoopDelegationTokenManager(
.getOrElse(isEnabledDeprecated)
}

/**
* List of file systems for which to obtain delegation tokens. The base implementation
* returns just the default file system in the given Hadoop configuration.
*/
protected def fileSystemsToAccess(): Set[FileSystem] = {
Set(FileSystem.get(hadoopConf))
}

private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ private[spark] trait HadoopDelegationTokenProvider {
* Obtain delegation tokens for this service and get the time of the next renewal.
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param creds Credentials to add tokens and security keys to.
* @param fileSystems List of file systems for which to obtain delegation tokens.
* @return If the returned tokens are renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
Expand All @@ -44,9 +44,9 @@ private[deploy] class HadoopFSDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)

// Get the token renewal interval if it is not set. It will only be called once.
Expand Down Expand Up @@ -133,3 +133,45 @@ private[deploy] class HadoopFSDelegationTokenProvider
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}

private[deploy] object HadoopFSDelegationTokenProvider {
def hadoopFSsToAccess(
sparkConf: SparkConf,
hadoopConf: Configuration): Set[FileSystem] = {
val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
val requestAllDelegationTokens = filesystemsToAccess.isEmpty

val master = sparkConf.get("spark.master")
val stagingFS = if (master != null && master.contains("yarn")) {
sparkConf.get(STAGING_DIR)
.map(new Path(_).getFileSystem(hadoopConf))
.getOrElse(FileSystem.get(hadoopConf))
} else {
FileSystem.get(hadoopConf)
}

// Add the list of available namenodes for all namespaces in HDFS federation.
// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
// namespaces.
val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") {
filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
} else {
val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
// Retrieving the filesystem for the nameservices where HA is not enabled
val filesystemsWithoutHA = nameservices.flatMap { ns =>
Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode =>
new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)
}
}
// Retrieving the filesystem for the nameservices where HA is enabled
val filesystemsWithHA = nameservices.flatMap { ns =>
Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ =>
new Path(s"hdfs://$ns").getFileSystem(hadoopConf)
}
}
(filesystemsWithoutHA ++ filesystemsWithHA).toSet
}

hadoopFilesystems + stagingFS
}
}
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,19 @@ package object config {
.checkValues(Set("keytab", "ccache"))
.createWithDefault("keytab")

private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.kerberos.access.namenodes")
.doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " +
"fs.defaultFS does not need to be listed here.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val FILESYSTEMS_TO_ACCESS =
ConfigBuilder("spark.kerberos.access.hadoopFileSystems")
.doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " +
"that hosts fs.defaultFS does not need to be listed here.")
.fallbackConf(NAMENODES_TO_ACCESS)

private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.intConf
.createOptional
Expand Down Expand Up @@ -1253,4 +1266,9 @@ package object config {
ConfigBuilder("spark.speculation.quantile")
.doubleConf
.createWithDefault(0.75)

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.stringConf
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -36,7 +35,6 @@ private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationT
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = throw new IllegalArgumentException
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.Matchers

import org.apache.spark.{SparkConf, SparkFunSuite}

class HadoopFSDelegationTokenProviderSuite extends SparkFunSuite with Matchers {
test("SPARK-24149: retrieve all namenodes from HDFS") {
val sparkConf = new SparkConf()
sparkConf.set("spark.master", "yarn-client")
val basicFederationConf = new Configuration()
basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
basicFederationConf.set("dfs.nameservices", "ns1,ns2")
basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020")
basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021")
val basicFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf),
new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf))
val basicFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, basicFederationConf)
basicFederationResult should be (basicFederationExpected)

// when viewfs is enabled, namespaces are handled by it, so we don't need to take care of them
val viewFsConf = new Configuration()
viewFsConf.addResource(basicFederationConf)
viewFsConf.set("fs.defaultFS", "viewfs://clusterX/")
viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/")
val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf))
HadoopFSDelegationTokenProvider
.hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected)

// invalid config should not throw NullPointerException
val invalidFederationConf = new Configuration()
invalidFederationConf.addResource(basicFederationConf)
invalidFederationConf.unset("dfs.namenode.rpc-address.ns2")
val invalidFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf))
val invalidFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, invalidFederationConf)
invalidFederationResult should be (invalidFederationExpected)

// no namespaces defined, ie. old case
val noFederationConf = new Configuration()
noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
val noFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(noFederationConf))
val noFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf,
noFederationConf)
noFederationResult should be (noFederationExpected)

// federation and HA enabled
val federationAndHAConf = new Configuration()
federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA")
federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA")
federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2")
federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023")
federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")

val federationAndHAExpected = Set(
new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf),
new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf))
val federationAndHAResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, federationAndHAConf)
federationAndHAResult should be (federationAndHAExpected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.language.existentials
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}

Expand All @@ -38,7 +37,6 @@ private[spark] class KafkaDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
logDebug("Attempting to fetch Kafka security token.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ import java.util.regex.{Matcher, Pattern}

import scala.collection.mutable.{HashMap, ListBuffer}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.SecurityManager
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -185,40 +182,4 @@ object YarnSparkHadoopUtil {
ConverterUtils.toContainerId(containerIdString)
}

/** The filesystems for which YARN should fetch delegation tokens. */
def hadoopFSsToAccess(
sparkConf: SparkConf,
hadoopConf: Configuration): Set[FileSystem] = {
val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
val requestAllDelegationTokens = filesystemsToAccess.isEmpty

val stagingFS = sparkConf.get(STAGING_DIR)
.map(new Path(_).getFileSystem(hadoopConf))
.getOrElse(FileSystem.get(hadoopConf))

// Add the list of available namenodes for all namespaces in HDFS federation.
// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
// namespaces.
val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") {
filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
} else {
val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
// Retrieving the filesystem for the nameservices where HA is not enabled
val filesystemsWithoutHA = nameservices.flatMap { ns =>
Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode =>
new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)
}
}
// Retrieving the filesystem for the nameservices where HA is enabled
val filesystemsWithHA = nameservices.flatMap { ns =>
Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ =>
new Path(s"hdfs://$ns").getFileSystem(hadoopConf)
}
}
(filesystemsWithoutHA ++ filesystemsWithHA).toSet
}

hadoopFilesystems + stagingFS
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ package object config {
.intConf
.createOptional

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.stringConf
.createOptional

/* Launcher configuration. */

private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
Expand Down Expand Up @@ -244,20 +239,6 @@ package object config {
.booleanConf
.createWithDefault(false)

/* Security configuration. */

private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes")
.doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " +
"fs.defaultFS does not need to be listed here.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems")
.doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " +
"that hosts fs.defaultFS does not need to be listed here.")
.fallbackConf(NAMENODES_TO_ACCESS)

/* Rolled log aggregation configuration. */

private[spark] val ROLLED_LOG_INCLUDE_PATTERN =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import java.util.ServiceLoader
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.SparkConf
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -74,8 +72,4 @@ private[spark] class YARNHadoopDelegationTokenManager(
credentialProviders.contains(serviceName) || super.isProviderLoaded(serviceName)
}

override protected def fileSystemsToAccess(): Set[FileSystem] = {
YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, hadoopConf)
}

}
Loading