-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-21700][security] Add an option to disable credential retrieval on a secure cluster #15131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 3c7129c (Sat Aug 28 11:21:03 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
Hi @lirui-apache , @XComp @rmetzger , could you help to review this PR? |
KarmaGYZ
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for the PR. I'm still trying to understand this change. Left some minor comments. Also, you should change the commit message/title following the format "[FLINK-21700][yarn] xxxx".
| "A comma-separated list of additional Kerberos-secured Hadoop filesystems Flink is going to access. For example, yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The client submitting to YARN needs to have access to these file systems to retrieve the security tokens."); | ||
|
|
||
| public static final ConfigOption<Boolean> YARN_SECURITY_ENABLED = | ||
| key("yarn.security.kerberos.fetch.delegationToken.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to generate the doc with mvn clean package -Dgenerate-config-docs -pl flink-docs -am -nsu -DskipTests -Dcheckstyle.skip
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| // setup security tokens | ||
| if (UserGroupInformation.isSecurityEnabled()) { | ||
| // set HDFS delegation tokens when security is enabled | ||
| LOG.info("Adding delegation token to the AM container."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If yarnFetchDelegationTokenEnabled == false, we should not print this log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified it: The log message is still valid since Utils.setTokensFor also sets user-related tokens. We only disable HDFS and HBase when SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN is disabled.
| final Text alias = new Text(token.getService()); | ||
| LOG.info("Adding user token " + alias + " with " + token); | ||
| credentials.addToken(alias, token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me to understand why we need this and what the different between token.getService() and token.getIdentifier(). Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that the identifier is not the only uniq id of the token.
So when getting existed token from UserGroupInformation.getCurrentUser().getTokens and these token may be having the same identifier, this is will cause overwriting tokens by using credentials.addToken(identifier, token);
Acutally, i think we should use service as the alias.
More detailed info has been replied in email to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the token fetching for HBase is also relying on the service name instead of the identifier to add the token. I don't understand your argument, though: In what situation would the identifier not be unique. ...considering that you called the identifier not being the only unique id. Could you elaborate a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to digging into Hadoop source code, i found that alias is just as the key as internal Credentials token HashMap.
In what situation would the identifier not be unique
In our production env, i found that identifier is the same in active and standby namenode HDFS delegation token. But i dont find any detailed Hadoop doc to support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there different delegation tokens for the different namenodes Flink has to deal with? I would have had assumed that there's only a single token for Flink accessing the Yarn cluster. Could you point me to the Hadoop source code you're referring to.
Sorry if it's a bit tedious. I haven't worked with Yarn/Kerberos, yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to https://issues.apache.org/jira/browse/HDFS-9276.
May not be very relevant.
Ok. i will force push it to correct commit log later. |
|
@zuston Could you squash your PR into one commit? Also the CI failed because of the style check, you could execute |
Done. Thanks |
XComp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @zuston and thanks for your contribution.
I'm wondering whether we can test that Flink is behaving in the right way when disabling the newly introduced flag. Right now, no test is covering this specific behavior.
| <td><h5>yarn.security.kerberos.fetch.delegationToken.enabled</h5></td> | ||
| <td style="word-wrap: break-word;">true</td> | ||
| <td>Boolean</td> | ||
| <td>When this is true Flink will fetch HDFS/HBase delegation token injected into AM container.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to elaborate a bit more on what's necessary to do when this flag is disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have the "kerberos" in the config key? Does it mean the delegation token only be obtained when "kerberos" is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the answer is YES. The delegation token mechanism is indeed an extension of Kerberos.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| final Text alias = new Text(token.getService()); | ||
| LOG.info("Adding user token " + alias + " with " + token); | ||
| credentials.addToken(alias, token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the token fetching for HBase is also relying on the service name instead of the identifier to add the token. I don't understand your argument, though: In what situation would the identifier not be unique. ...considering that you called the identifier not being the only unique id. Could you elaborate a bit more.
It only works when credentials existed in UGI and no keytab is specified. I think It's hard to test, could you give me some ideas on it? @XComp |
That's a good questions. @rmetzger How did we deal with these kind of problems in the past? Was it sufficient enough to have a manual test with the external system to see that it worked properly? |
|
Do i need to submit another PR about changing |
Putting it in a separate commit within this PR is good enough if we go with this change. |
Could you come up with a manual test which is documented properly in the PR to make it reproducible? |
|
Thanks @XComp. |
XComp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you visualize how the Oozie -> Flink -> Kerberos -> HDFS/Hbase communication works in contrast to the default one? That might help me understand the change in a better way.
| // for HBase | ||
| obtainTokenForHBase(credentials, conf); | ||
|
|
||
| if (yarnFetchDelegationEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's also worth it to add an info log message here for both cases making it explicit in the logs that the delegation fetching is enabled or disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
wangyang0918
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this ticket. After reading some implementation of Spark, I think it is a correct direction to disable fetching the delegation tokens. I left some comments and please have a look.
| <td><h5>yarn.security.kerberos.fetch.delegationToken.enabled</h5></td> | ||
| <td style="word-wrap: break-word;">true</td> | ||
| <td>Boolean</td> | ||
| <td>When this is true Flink will fetch HDFS/HBase delegation token injected into AM container.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have the "kerberos" in the config key? Does it mean the delegation token only be obtained when "kerberos" is enabled?
| ContainerLaunchContext amContainer, | ||
| List<Path> paths, | ||
| Configuration conf, | ||
| boolean yarnFetchDelegationEnabled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| boolean yarnFetchDelegationEnabled) | |
| boolean obtainingDelegationTokens) |
| final Text id = new Text(token.getIdentifier()); | ||
| LOG.info("Adding user token " + id + " with " + token); | ||
| credentials.addToken(id, token); | ||
| final Text alias = new Text(token.getService()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using the token.getService() makes sense here. However, I would suggest to factor out these changes into a separate commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| List<Path> yarnAccessList = | ||
| ConfigUtils.decodeListFromConfig( | ||
| configuration, YarnConfigOptions.YARN_ACCESS, Path::new); | ||
| List<Path> yarnAccessList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to document that yarn.security.kerberos.additionalFileSystems should be unset when obtaining delegation tokens are disabled.
Maybe we could also add a precondition check instead of process it silently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean that when YARN_ACCESS and YARN_SECURITY_ENABLED are existed together, Flink client should fast fail through throwing exception or exit Non-zero code.
Right? @wangyang0918
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we disable obtaining the delegation tokens(HDFS/HBase), then yarn.security.kerberos.additionalFileSystems is expected to be unset. Right?
If this is the case, I think throwing exception here makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Agree with you, i will add precheck.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Please review it.
| .withDescription( | ||
| "A comma-separated list of additional Kerberos-secured Hadoop filesystems Flink is going to access. For example, yarn.security.kerberos.additionalFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The client submitting to YARN needs to have access to these file systems to retrieve the security tokens."); | ||
|
|
||
| public static final ConfigOption<Boolean> YARN_SECURITY_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think whether to fetch delegation token and whether "security is enabled" are different things. So I would suggest rename this option to something like KERBEROS_USE_DELEGATION_TOKEN.
Besides, delegation token is not specific to yarn. I think spark supports delegation token for both yarn and mesos. So I also suggest move this option to SecurityOptions in flink-core, alongside with other kerberos configurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Rename is ok. But KERBEROS_USE_DELEGATION_TOKEN maybe not suitable.
Actually, this option indicates Flink whether to fetch delegation token actively, not no use delegation token.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, could you please explain in which situation would a user want to fetch delegation tokens but not to use them? Or what a user need to do to really use delegation tokens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hadoop delegation token is used to access to HDFS/HBase/Hive service. So when you want to be interactive with HDFS, you will need it.
Two options can be used to access above services.
First is to fetch delegation token using Keytab.
Second is to get it from current ugi, if current ugi has the delegation token which is already existing and can be reused.
Refer to: https://blog.cloudera.com/hadoop-delegation-tokens-explained/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this option only controls whether we fetch delegation tokens, then maybe we can call it KERBEROS_FETCH_DELEGATION_TOKEN ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Got it.
After rechecking the delegation token usage scope, i think it's only specific to Hadoop and related framework, like Hive/HBase. Right?
If not, can you give me some doc or issue link on Spark mesos delegation token? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delegation tokens are usually used for a distributed job to authenticate with a Hadoop-based service like Hive or HBase. But it should be orthogonal to the resource management framework like yarn, mesos, or k8s. Spark doc indicates that delegation tokens are supported for yarn and mesos. And its delegation token implementations (e.g. HadoopDelegationTokenManager, HadoopDelegationTokenProvider) are in spark-core.
Actually other projects may also implement their own delegation token mechanisms like what kafka does. But I guess that exceeds the scope of this ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Please review it.
|
Update commits. Please check it. @XComp @lirui-apache @wangyang0918 @KarmaGYZ |
| .defaultValue(true) | ||
| .withDescription( | ||
| "Indicates whether to fetch delegation token. If true, Flink will fetch " | ||
| + "HDFS/HBase delegation tokens and inject into AM container." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have a more generic description for this option, maybe something like "whether to fetch delegation tokens for external services the Flink job needs to contact". We can mention only HDFS and HBase are supported, and only works for flink-yarn at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Please take a look. Thanks.
| + "Only HDFS and HBase are supported, and only works for flink-yarn at the moment. " | ||
| + "If true, Flink will fetch HDFS/HBase delegation tokens and inject into Yarn AM container. " | ||
| + "If false, Flink will assume that the job has delegation tokens and will not fetch them. " | ||
| + "This applies to submission mechanisms like Oozie, which will obtain delegation tokens " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate what it means by saying "This applies to submission mechanisms like Oozie"? IIUC, Flink can't control how Oozie submits jobs, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIU, Oozie schedules Flink job submissions and utilizes Hadoop's ProxyUser feature to obtain an delegation token to access the components (in our case HDFS and Hbase). As far as I understand it, the delegation token request usually triggered by Flink would fail if Apache Oozie handles the delegation tokens as it uses it's own Proxy User that impersonates the actual users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, This applies to submission mechanisms like Oozie seems to imply that Oozie also respects this config option. Alternatively, maybe we can say something like: You may want to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the description is not clear.
ou may want to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, to handle delegation tokens.
This is good.
XComp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @zuston, sorry for the late reply. I didn't have time to look into the delegation token topic till now.
I'm still not comfortable with this change considering that it's not tested. Have you had the chance to check whether it works? Could you provide this manual test in a reproducible fashion (e.g. docker) or is this too much of an effort?
Based on what I read about it, the issue is that Apache Oozie utilizes Apache Hadoop's ProxyUser which impersonates the actual user which has access to the actual data. I still don't understand why the delegation token fetching causes an error. Is it because the Flink job would be still submitted under the "normal" user instead of the Oozie user?
| + "Only HDFS and HBase are supported, and only works for flink-yarn at the moment. " | ||
| + "If true, Flink will fetch HDFS/HBase delegation tokens and inject into Yarn AM container. " | ||
| + "If false, Flink will assume that the job has delegation tokens and will not fetch them. " | ||
| + "This applies to submission mechanisms like Oozie, which will obtain delegation tokens " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIU, Oozie schedules Flink job submissions and utilizes Hadoop's ProxyUser feature to obtain an delegation token to access the components (in our case HDFS and Hbase). As far as I understand it, the delegation token request usually triggered by Flink would fail if Apache Oozie handles the delegation tokens as it uses it's own Proxy User that impersonates the actual users.
| "Indicates whether to fetch delegation token for external services the Flink job needs to contact. " | ||
| + "Only HDFS and HBase are supported, and only works for flink-yarn at the moment. " | ||
| + "If true, Flink will fetch HDFS/HBase delegation tokens and inject into Yarn AM container. " | ||
| + "If false, Flink will assume that the job has delegation tokens and will not fetch them. " | ||
| + "This applies to submission mechanisms like Oozie, which will obtain delegation tokens " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "Indicates whether to fetch delegation token for external services the Flink job needs to contact. " | |
| + "Only HDFS and HBase are supported, and only works for flink-yarn at the moment. " | |
| + "If true, Flink will fetch HDFS/HBase delegation tokens and inject into Yarn AM container. " | |
| + "If false, Flink will assume that the job has delegation tokens and will not fetch them. " | |
| + "This applies to submission mechanisms like Oozie, which will obtain delegation tokens " | |
| "Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. " | |
| + "Only HDFS and HBase are supported. It is used in YARN deployments. " | |
| + "If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. " | |
| + "If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase." | |
| + "This applies to submission mechanisms like Apache Oozie which will obtain delegation tokens " |
Here's a proposal to make the docs more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the documentation has not updated, yet.
| // for HBase | ||
| obtainTokenForHBase(credentials, conf); | ||
|
|
||
| if (obtainingDelegationTokens) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to have separate parameters for each of the services similar to how Spark is implementing this feature? That's just an idea I came up with after browsing the Spark sources where it's separated like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it should submit another issue to separate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's not necessary for now considering that we don't have a use-case where we want to disable them individually. So, I'm fine with keeping it like that considering that it makes configuration easier.
| boolean yarnAccessFSEnabled = | ||
| flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS) != null; | ||
| if (!kerberosFetchDTEnabled && yarnAccessFSEnabled) { | ||
| throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't be a warning sufficient enough? The YARN_ACCESS setting does not harm the system. It would be just ignored if delegation tokens are disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YARN_ACCESS and KERBEROS_FETCH_DELEGATION_TOKEN are exclusive, it's better to fast fail.
Because if it’s ignored, it will make users confused whether YARN_ACCESS really works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I missed the thread. Thanks for pointing to it.
| Boolean kerberosFetchDelegationTokenEnabled = | ||
| configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN); | ||
|
|
||
| if (kerberosFetchDelegationTokenEnabled) { | ||
| // set HDFS delegation tokens when security is enabled | ||
| LOG.info("Adding delegation token to the AM container."); | ||
| yarnAccessList = | ||
| ConfigUtils.decodeListFromConfig( | ||
| configuration, YarnConfigOptions.YARN_ACCESS, Path::new); | ||
| } | ||
|
|
||
| Utils.setTokensFor( | ||
| amContainer, | ||
| ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()), | ||
| yarnConfiguration); | ||
| yarnConfiguration, | ||
| kerberosFetchDelegationTokenEnabled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether this change is actually necessary? Is there a specific reason for it? Aren't we implicitly ignoring the YARN_ACCESS setting in Utils.setTokensFor? Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YARN_ACCESS and KERBEROS_FETCH_DELEGATION_TOKEN are exclusive.
It's kerberos mechanism.
Please check spark related doc: https://spark.apache.org/docs/latest/running-on-yarn.html#launching-your-application-with-apache-oozie
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it...
|
@XComp Thanks for your reply
We have applied and tested this PR in our production. It looks fine. Sorry, I am not very familiar with the Flink code. Can you provide a test case link about with kerberos HDFS? I want to refer it to add test case. How to reproduce it?
No. Oozie will submit Flink job without keytab and only rely on delegation token to access HDFS. And why cause these exception? Actually there is no need for Flink to fetch token, Flink can use the token Oozie has fetched directly. |
6988932 to
c5b8d96
Compare
Thanks for the update @zuston . I'm just wondering why you squashed the two commits into one again. Considering that the two changes (service name as alias and new parameter) are independent of each other I would favor @wangyang0918 's proposal of keeping the two commits separate. |
|
@XComp Thanks for your reply. |
|
Gentle ping @XComp |
XComp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for recreating the split between the two commits. Two things slipped through under my radar in the last review which needs to be addressed still.
| throw new IllegalConfigurationException( | ||
| "When security.kerberos.fetch.delegation-token is set, " | ||
| + "yarn.security.kerberos.additionalFileSystems must be unset."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| throw new IllegalConfigurationException( | |
| "When security.kerberos.fetch.delegation-token is set, " | |
| + "yarn.security.kerberos.additionalFileSystems must be unset."); | |
| throw new IllegalConfigurationException( | |
| String.format( | |
| "When %s is disabled, %s must be disabled as well.", | |
| SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN.key(), | |
| YarnConfigOptions.YARN_ACCESS.key())); |
The message was wrong, wasn't it? Additionally, it's better to use the references to the parameters instead of plain text.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // setup security tokens | ||
| if (UserGroupInformation.isSecurityEnabled()) { | ||
| // set HDFS delegation tokens when security is enabled | ||
| LOG.info("Adding delegation token to the AM container."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified it: The log message is still valid since Utils.setTokensFor also sets user-related tokens. We only disable HDFS and HBase when SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN is disabled.
| amContainer, | ||
| ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()), | ||
| yarnConfiguration); | ||
| List<Path> pathsToObtainToken = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| List<Path> pathsToObtainToken = null; | |
| List<Path> pathsToObtainToken = Collections.emptyList(); |
I'm not that comfortable to pass null into Utils.setTokensFor as it's not a @Nullable parameter. It does not harm the execution because it's only used in Utils.setTokensFor if SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN is enabled (and in that case we overwrite pathsToObtainToken). But still, I would vote for switching to an empty list instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done~
|
@XComp Thanks~. All resolved |
XComp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for responding so quickly, @zuston. I did some code analysis on the getIdentifier/getService issue which makes me wonder whether we should do it in a dedicated ticket instead of sneaking it in here as part of FLINK-21700. This solution does not require the change from getIdentifier to getService, does it?
See my comments below.
| final Text id = new Text(token.getIdentifier()); | ||
| LOG.info("Adding user token " + id + " with " + token); | ||
| credentials.addToken(id, token); | ||
| LOG.info("Adding user token " + token.getService() + " with " + token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterating over it once more, I realized that HadoopModule uses the identifier as well. In contrast, the HBase delegation token always used getService. Do we have to align that? Moreover, I'm wondering whether we should make this a dedicated ticket to align it as it's not necessary for this feature. That would make this issue more transparent. Do you have any objections against that, @zuston ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't agree more. It will make more clear when seperating PR.
I will submit new one and we could talk more details on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we will move the second commit to a separate ticket. Right?
| + "If false, Flink will assume that the delegation tokens are managed outside of Flink. " | ||
| + "As a consequence, it will not fetch delegation tokens for HDFS and HBase. " | ||
| + "You may need to disable this option, if you rely on submission mechanisms, e.g. Apache Oozie, " | ||
| + "to handle delegation tokens. "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| + "to handle delegation tokens. "); | |
| + "to handle delegation tokens."); |
a minor thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf); | ||
| // for HBase | ||
| obtainTokenForHBase(credentials, conf); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we discussed adding an else branch containing an info log message pointing out that delegation token retrieval for HDFS and HBase is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, i missed.
So message like: LOG.info("Delegation token retrieval for HDFS and HBase is disabled.");
|
@XComp Done. |
XComp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for splitting it up, @zuston . I mentioned one minor issue below. But other than that, I guess, we can finalize it. @wangyang0918 would you be fine with confirming this PR?
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
Outdated
Show resolved
Hide resolved
|
@XComp Thanks for your review. All done. |
|
@XComp Gently ping. Could you help merge it? |
|
Sure, I will initiate it as soon as @wangyang0918 gave it another pass. He has it on his ToDo list and will do it soon'ish |
wangyang0918
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zuston Thanks for updating this PR. It looks really good to me. I just left some minor comments. Please have a look.
BTW, we have unrelated documentation changes in the second commit. And if we agree to move the second commit to a separate ticket, then this PR needs to be refined again.
| + "does not have Kerberos credentials or delegation tokens!"); | ||
| } | ||
|
|
||
| boolean fetchToken = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fetchToken and yarnAccessFSEnabled could be final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| boolean fetchToken = | ||
| flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN); | ||
| boolean yarnAccessFSEnabled = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use CollectionUtil.isNullOrEmpty to check whether YARN_ACCESS is not set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| amContainer, | ||
| ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()), | ||
| yarnConfiguration); | ||
| List<Path> pathsToObtainToken = Collections.emptyList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would like to create a final list and add them explicitly.
This could avoid pathsToObtainToken is changed unexpectedly. And also the IDE complains the Uncheck assignment when using ListUtils.union.
final List<Path> pathsToObtainToken = new ArrayList<>();
...
pathsToObtainToken.addAll(yarnAccessList);
pathsToObtainToken.addAll(fileUploader.getRemotePaths());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| final Text id = new Text(token.getIdentifier()); | ||
| LOG.info("Adding user token " + id + " with " + token); | ||
| credentials.addToken(id, token); | ||
| LOG.info("Adding user token " + token.getService() + " with " + token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we will move the second commit to a separate ticket. Right?
… on a secure cluster
|
@wangyang0918 All done. Thanks |
wangyang0918
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for merging.
|
Thanks @wangyang0918 for giving it another pass. @rmetzger may you take over and merge the PR, please? |
|
Thanks for the contribution and the reviews. Merging. |
Linked to ISSUE.
Why
I want to support Flink Batch Action on Oozie.
As we know, Oozie will obtain HDFS/HBase delegation token using Hadoop proxy user mechanism before starting Flink submitter cli. If not, Flink will obtain delegation token again, this will cause exception.
Actually, Spark support disable fetching delegation token on Spark client, related Spark doc.
So i think Flink should allow to disable fetching Hadoop delegation token on Yarn.