-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16742] Mesos Kerberos Support #17665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
ce63a9b
75d849a
35002f2
13981c8
af4a3e4
5cc66dc
a47c9c0
c8ec049
954eeff
2d76928
d8a968d
b8093c8
25d5088
4c387eb
e32afee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ import scala.util.Properties | |
| import org.apache.commons.lang3.StringUtils | ||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.hadoop.security.UserGroupInformation | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration | ||
| import org.apache.ivy.Ivy | ||
| import org.apache.ivy.core.LogOptions | ||
| import org.apache.ivy.core.module.descriptor._ | ||
|
|
@@ -45,6 +46,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl | |
| import org.apache.spark._ | ||
| import org.apache.spark.api.r.RUtils | ||
| import org.apache.spark.deploy.rest._ | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.launcher.SparkLauncher | ||
| import org.apache.spark.util._ | ||
|
|
||
|
|
@@ -63,7 +65,7 @@ private[deploy] object SparkSubmitAction extends Enumeration { | |
| * This program handles setting up the classpath with relevant Spark dependencies and provides | ||
| * a layer over the different cluster managers and deploy modes that Spark supports. | ||
| */ | ||
| object SparkSubmit extends CommandLineUtils { | ||
| object SparkSubmit extends CommandLineUtils with Logging { | ||
|
|
||
| // Cluster managers | ||
| private val YARN = 1 | ||
|
|
@@ -564,12 +566,22 @@ object SparkSubmit extends CommandLineUtils { | |
| // properties and then loaded by SparkConf | ||
| sysProps.put("spark.yarn.keytab", args.keytab) | ||
| sysProps.put("spark.yarn.principal", args.principal) | ||
|
|
||
| UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with | ||
| // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we | ||
| // must trick it into thinking we're YARN. | ||
| if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { | ||
| val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName | ||
| val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it work in user impersonation scenario? Here
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it? It looks like Regardless, the renewer specified here actually has no effect, since we aren't renewing yet. Once I add renewal, I'll need to revisit this to make sure it's consistent with the renewal we're going to do in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The renewer for delegation tokens should be the user that created them (if the service is not doing it for the user, like in the YARN case); only the logged in user can create tokens, so this has to be the current user (not the proxy user, which happens later). The tokens should be created in the proxy user's name (so user = "proxy" renewer = "real user"), when you care to support that.
So do you mean that currently this only works until the delegation tokens need renewal (which is 1 day by default - a lot shorter than the max life time)?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes |
||
|
|
||
| logDebug(s"Setting ${key} to ${shortUserName}.") | ||
| sysProps.put(key, shortUserName) | ||
| } | ||
|
|
||
| // In yarn-cluster mode, use yarn.Client as a wrapper around the user class | ||
| if (isYarnCluster) { | ||
| childMainClass = "org.apache.spark.deploy.yarn.Client" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy.yarn.security | ||
| package org.apache.spark.deploy.security | ||
|
|
||
| import java.util.ServiceLoader | ||
|
|
||
|
|
@@ -41,15 +41,17 @@ import org.apache.spark.util.Utils | |
| * For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by | ||
| * the configuration spark.yarn.security.credentials.hive.enabled. | ||
| */ | ||
| private[yarn] final class ConfigurableCredentialManager( | ||
| private[spark] class ConfigurableCredentialManager( | ||
| sparkConf: SparkConf, hadoopConf: Configuration) extends Logging { | ||
| private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled" | ||
|
||
| private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled" | ||
|
||
|
|
||
| // Maintain all the registered credential providers | ||
| private val credentialProviders = { | ||
| val providers = ServiceLoader.load(classOf[ServiceCredentialProvider], | ||
| Utils.getContextOrSparkClassLoader).asScala | ||
| private val credentialProviders = getCredentialProviders() | ||
| logDebug(s"Using the following credential providers: ${credentialProviders.keys.mkString(", ")}.") | ||
|
|
||
| private def getCredentialProviders(): Map[String, ServiceCredentialProvider] = { | ||
| val providers = loadCredentialProviders | ||
|
|
||
| // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false. | ||
| providers.filter { p => | ||
|
|
@@ -64,15 +66,22 @@ private[yarn] final class ConfigurableCredentialManager( | |
| }.map { p => (p.serviceName, p) }.toMap | ||
| } | ||
|
|
||
| /** | ||
| protected def loadCredentialProviders: List[ServiceCredentialProvider] = { | ||
| ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader) | ||
| .asScala.toList | ||
| } | ||
|
|
||
| /** | ||
| * Get credential provider for the specified service. | ||
| */ | ||
| def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = { | ||
| credentialProviders.get(service) | ||
| } | ||
|
|
||
| /** | ||
| * Obtain credentials from all the registered providers. | ||
| * Writes delegation tokens to creds. Delegation tokens are fetched from all registered | ||
| * providers. | ||
| * | ||
| * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable, | ||
| * otherwise the nearest renewal time of any credentials will be returned. | ||
| */ | ||
|
|
@@ -87,21 +96,4 @@ private[yarn] final class ConfigurableCredentialManager( | |
| } | ||
| }.foldLeft(Long.MaxValue)(math.min) | ||
| } | ||
|
|
||
| /** | ||
| * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this | ||
| * instance when it is not used. AM will use it to renew credentials periodically. | ||
| */ | ||
| def credentialRenewer(): AMCredentialRenewer = { | ||
| new AMCredentialRenewer(sparkConf, hadoopConf, this) | ||
| } | ||
|
|
||
| /** | ||
| * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance | ||
| * when it is not used. Executors and driver (client mode) will use it to update credentials. | ||
| * periodically. | ||
| */ | ||
| def credentialUpdater(): CredentialUpdater = { | ||
| new CredentialUpdater(sparkConf, hadoopConf, this) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy.security | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
| import org.apache.hadoop.security.{Credentials, UserGroupInformation} | ||
| import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.internal.Logging | ||
|
|
||
| private[deploy] class HadoopFSCredentialProvider | ||
| extends ServiceCredentialProvider with Logging { | ||
| // Token renewal interval, this value will be set in the first call, | ||
| // if None means no token renewer specified or no token can be renewed, | ||
| // so cannot get token renewal interval. | ||
| private var tokenRenewalInterval: Option[Long] = null | ||
|
|
||
| override val serviceName: String = "hadoopfs" | ||
|
|
||
| override def obtainCredentials( | ||
| hadoopConf: Configuration, | ||
| sparkConf: SparkConf, | ||
| creds: Credentials): Option[Long] = { | ||
| // NameNode to access, used to get tokens from different FileSystems | ||
| val tmpCreds = new Credentials() | ||
| val tokenRenewer = getTokenRenewer(hadoopConf) | ||
| hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst => | ||
| val dstFs = dst.getFileSystem(hadoopConf) | ||
| logInfo("getting token for: " + dst) | ||
| dstFs.addDelegationTokens(tokenRenewer, tmpCreds) | ||
| } | ||
|
|
||
| // Get the token renewal interval if it is not set. It will only be called once. | ||
| if (tokenRenewalInterval == null) { | ||
| tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf) | ||
| } | ||
|
|
||
| // Get the time of next renewal. | ||
| val nextRenewalDate = tokenRenewalInterval.flatMap { interval => | ||
| val nextRenewalDates = tmpCreds.getAllTokens.asScala | ||
| .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) | ||
| .map { t => | ||
| val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] | ||
| identifier.getIssueDate + interval | ||
| } | ||
| if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) | ||
| } | ||
|
|
||
| creds.addAll(tmpCreds) | ||
| nextRenewalDate | ||
| } | ||
|
|
||
| protected def getTokenRenewalInterval( | ||
| hadoopConf: Configuration, | ||
| sparkConf: SparkConf): Option[Long] = None | ||
|
|
||
| protected def getTokenRenewer(hadoopConf: Configuration): String = { | ||
| UserGroupInformation.getCurrentUser.getShortUserName | ||
| } | ||
|
|
||
| protected def hadoopFSsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = { | ||
| Set(FileSystem.get(hadoopConf).getHomeDirectory) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy.security | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.security.{Credentials, UserGroupInformation} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this move not break all existing ServiceCredentialProvider's ? (since the service discovery via META-INF/services will get affected now).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code in the "yarn" module is sort of considered "private", so anyone extending those classes is walking on thin ice... the internal services files need to be moved / updated, but I don't think we need to worry about external implementations. This also means we need to be more careful here when making classes / traits public. Maybe add (Haven't looked at the rest of the code yet, kinda waiting for the PR builder to be happy first.)
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've moved all the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin Since this is documented in running-on-yarn.md, I dont think it can be considered internal implementation detail anymore; but it is part of our exposed contract. @mgummelt I did see that all spark impl have been moved - but since this is an exposed contract, there can be (and are) external implementations which rely on it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... that's unfortunate, not just because of this change, but because I really doubt anyone has custom implementations of that interface at this point. And making it a proper API in core is better going forward, IMO. If you really think that it's important to keep backwards compatibility, we could have the old interface present, extending the new one, and some compatibility code to keep both working. But that sounds like a lot of work for something I doubt anyone is currently using...
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin Would be good to keep backward compatibility since there are users of the interface. Cursory look at ServiceLoader seemed to indicate it might not be very involved to provide backward compatibility (
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For backwards compat you'd have to call
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I'll try to unbreak backwards compatibility.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I:
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not quite sure this is correct though. As it is, if a user has a service listed in the resources entry for both Thoughts? |
||
|
|
||
| import org.apache.spark.SparkConf | ||
|
|
||
| /** | ||
| * A credential provider for a service. User must implement this if they need to access a | ||
| * secure service from Spark. | ||
| */ | ||
| trait ServiceCredentialProvider { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No annotation means "Unstable" IIRC, but might as well be explicit here and add a proper
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a Scala thing, it's a Spark API thing. When adding a public API that might change, we annotate it appropriately so that people can know that it's not really stable. Flip side, if you change an API that is tagged with a "Stable" annotation, you'll be yelled at. It also shows up in docs: (Basically this is the new version of
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ah, thanks. I didn't know this was allowed at all.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
|
||
| /** | ||
| * Name of the service to provide credentials. This name should unique, Spark internally will | ||
| * use this name to differentiate credential provider. | ||
| */ | ||
| def serviceName: String | ||
|
|
||
| /** | ||
| * To decide whether credential is required for this service. By default it based on whether | ||
| * Hadoop security is enabled. | ||
| */ | ||
| def credentialsRequired(hadoopConf: Configuration): Boolean = { | ||
| UserGroupInformation.isSecurityEnabled | ||
| } | ||
|
|
||
| /** | ||
| * Obtain credentials for this service and get the time of the next renewal. | ||
| * @param hadoopConf Configuration of current Hadoop Compatible system. | ||
| * @param sparkConf Spark configuration. | ||
| * @param creds Credentials to add tokens and security keys to. | ||
| * @return If this Credential is renewable and can be renewed, return the time of the next | ||
| * renewal, otherwise None should be returned. | ||
| */ | ||
| def obtainCredentials( | ||
| hadoopConf: Configuration, | ||
| sparkConf: SparkConf, | ||
| creds: Credentials): Option[Long] | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,11 +21,14 @@ import java.net.URL | |
| import java.nio.ByteBuffer | ||
| import java.util.Locale | ||
| import java.util.concurrent.atomic.AtomicBoolean | ||
| import javax.xml.bind.DatatypeConverter | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.util.{Failure, Success} | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.hadoop.security.UserGroupInformation | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.TaskState.TaskState | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
|
|
@@ -174,6 +177,24 @@ private[spark] class CoarseGrainedExecutorBackend( | |
|
|
||
| private[spark] object CoarseGrainedExecutorBackend extends Logging { | ||
|
|
||
| private def addMesosDelegationTokens(driverConf: SparkConf) { | ||
| val value = driverConf.get("spark.mesos.kerberos.userCredentials") | ||
|
||
| val tokens = DatatypeConverter.parseBase64Binary(value) | ||
|
|
||
| logDebug(s"Found delegation tokens of ${tokens.length} bytes.") | ||
|
|
||
| // Use tokens for HDFS login. | ||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(driverConf) | ||
| hadoopConf.set("hadoop.security.authentication", "Token") | ||
| UserGroupInformation.setConfiguration(hadoopConf) | ||
|
|
||
| // Decode tokens and add them to the current user's credentials. | ||
| val creds = UserGroupInformation.getCurrentUser.getCredentials | ||
| val tokensBuf = new java.io.ByteArrayInputStream(tokens) | ||
| creds.readTokenStorageStream(new java.io.DataInputStream(tokensBuf)) | ||
| UserGroupInformation.getCurrentUser.addCredentials(creds) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is relying on an immutable initial credential populated in the driver conf.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my pending renewal PR, I've implemented it to use a new
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would be great, thanks - I am trying to get a sense of how renewal works in this case.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To elaborate; one potential use of it is to do token acquisition and token distribution without needing to provide principal/keytab to spark application (other than keeping track of last credential update to account for AM failures).
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I believe @jerryshao is working on a standard method of delegating tokens. We can plug that in once it exists. |
||
|
|
||
| private def run( | ||
| driverUrl: String, | ||
| executorId: String, | ||
|
|
@@ -220,6 +241,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | |
| SparkHadoopUtil.get.startCredentialUpdater(driverConf) | ||
| } | ||
|
|
||
| if (driverConf.contains("spark.mesos.kerberos.userCredentials")) { | ||
| addMesosDelegationTokens(driverConf) | ||
| } | ||
|
|
||
| val env = SparkEnv.createExecutorEnv( | ||
| driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| org.apache.spark.deploy.security.TestCredentialProvider |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,7 +102,7 @@ spark-deps-.* | |
| org.apache.spark.scheduler.ExternalClusterManager | ||
| .*\.sql | ||
| .Rbuildignore | ||
| org.apache.spark.deploy.yarn.security.ServiceCredentialProvider | ||
| org.apache.spark.deploy.security.ServiceCredentialProvider | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a change I have internally I changed this to: If that works (haven't tested yet) I think that's better than having to keep changing / adding these files here... |
||
| spark-warehouse | ||
| structured-streaming/* | ||
| kafka-source-initial-offset-version-2.1.0.bin | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| org.apache.spark.deploy.security.HadoopFSCredentialProvider | ||
| org.apache.spark.deploy.security.HBaseCredentialProvider | ||
| org.apache.spark.deploy.security.HiveCredentialProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... you're adding a YARN dependency in
core, which should be able to build without YARN... but you're not actually adding a new dependency to the POM, so I guess the dependency is already there indirectly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea it looks like this should require
hadoop-yarn-api. I'll add this dependency tocoreunless you object, in which case I suppose I could just use the raw string instead ofYarnConfiguration.RM_PRINCIPAL, but that seems hacky.FYI, we originally talked about placing the code below into the Mesos scheduler, but that seems to be too late. The
SparkContextconstructor creates a copy of the configuration, so we need to set the required YARN property beforeSparkContextis created, which means before user code gets run, which probably means somewhere in SparkSubmit.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need the explicit dependency (otherwise this would not be compiling). This is probably being brought by hadoop-client already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to explicitly declare a dependency rather than rely on transitivity, right?