This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 006c2dca6d87 [SPARK-46977][CORE] A failed request to obtain a token
from one NameNode should not skip subsequent token requests
006c2dca6d87 is described below
commit 006c2dca6d87e29a69e30124e8320c275859d148
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Feb 5 12:18:20 2024 -0800
[SPARK-46977][CORE] A failed request to obtain a token from one NameNode
should not skip subsequent token requests
### What changes were proposed in this pull request?
This PR enhances the `HadoopFSDelegationTokenProvider` to tolerate failures
when fetching tokens from multiple NameNodes.
### Why are the changes needed?
Let's say we are going to access 3 HDFS, `nn-1`, `nn-2`, `nn-3` in YARN
cluster mode with TGT cache, while the `nn-1` is the `defaultFs` which is used
by YARN to store aggregated logs, and there are issues in `nn-2` which can not
issue the token.
```
spark-submit \
--master yarn \
--deployMode cluster \
--conf
spark.kerberos.access.hadoopFileSystems=hdfs://nn-1,hdfs://nn-2,hdfs://nn-3 \
...
```
During the submitting phase, Spark is going to call
`HadoopFSDelegationTokenProvider` to fetch tokens from all declared NameNodes
one by one, in **indeterminate** order
(`HadoopFSDelegationTokenProvider.hadoopFSsToAccess` process and return a
`Set[FileSystem]`), so the order may not respect the user declared order in
`spark.kerberos.access.hadoopFileSystems`.
If the order is [`nn-1`, `nn-2`, `nn-3`], then we are going to request a
token from `nn-1` successfully, but fail for `nn-2` with the below error, the
left `nn-3` is going to be skipped. But such failure WON'T block the whole
submitting process, the Spark app is going to be submitted with only `nn-1`
token.
```
2024-01-03 12:41:36 [WARN] [main]
org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider#94 - Failed to
get token from service hadoopfs
org.apache.hadoop.ipc.RemoteException: <Some Error Message>
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1507)
~[hadoop-common-2.9.2.2.jar:?]
...
at
org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2604)
~[hadoop-hdfs-client-2.9.2.2.jar:?]
at
org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider.$anonfun$fetchDelegationTokens$1(HadoopFSDelegationTokenProvider.scala:122)
~[spark-core_2.12-3.3.1.27.jar:3.3.1.27]
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:335)
~[scala-library-2.12.15.jar:?]
at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:1111)
~[scala-library-2.12.15.jar:?]
at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:1111)
~[scala-library-2.12.15.jar:?]
at
org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider.fetchDelegationTokens(HadoopFSDelegationTokenProvider.scala:115)
~[spark-core_2.12-3.3.1.27.jar:3.3.1.27]
...
at
org.apache.spark.deploy.security.HadoopDelegationTokenManager.obtainDelegationTokens(HadoopDelegationTokenManager.scala:146)
~[spark-core_2.12-3.3.1.27.jar:3.3.1.27]
at
org.apache.spark.deploy.yarn.Client.setupSecurityToken(Client.scala:352)
~[spark-yarn_2.12-3.3.1.27.jar:3.3.1.27]
at
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:1140)
~[spark-yarn_2.12-3.3.1.27.jar:3.3.1.27]
...
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
~[spark-core_2.12-3.3.1.27.jar:3.3.1.27]
```
when the Spark app access `nn-2` and `nn-3`, it will fail with
`o.a.h.security.AccessControlException: Client cannot authenticate via:[TOKEN,
KERBEROS]`
Things become worse if the FS order is [`nn-3`, `nn-2`, `nn-1`], the Spark
app will be submitted to YARN with only `nn-3` token, it even has no chance to
allow NodeManager to upload aggregated logs after the application exit because
it requires the app to provide a token to access `nn-1`.
the log from NodeManager
```
2024-01-03 08:08:14,028 [3173570620] - WARN [NM ContainerManager
dispatcher:Client$Connection1$772] - Exception encountered while connecting to
the server
Caused by: org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]
at
org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:179)
at
org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:392)
...
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1768)
...
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.createAppDir(LogAggregationService.java:404)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.initAppAggregator(LogAggregationService.java:273)
...
```
Without the logs, we even don't know what happened.
Eventually, due to the **indeterminate** order of NameNodes to request
tokens, such a Job sometimes submitted successfully and sometimes failed
without logs.
<img width="1903" alt="image"
src="https://github.com/apache/spark/assets/26535726/7ca5c871-6399-4eae-b689-d6d741c1c373">
### Does this PR introduce _any_ user-facing change?
Yes, when the user configures `spark.kerberos.access.hadoopFileSystems` to
access multiple Kerberized HDFS, and one or more NameNodes have issues, tokens
are always fetched from the rest health NameNodes after this patch.
### How was this patch tested?
Tested in internal Kerberized cluster.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45030 from pan3793/SPARK-46977.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/deploy/security/HadoopFSDelegationTokenProvider.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 9242fe82d249..8eb45238b477 100644
---
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -31,6 +31,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.security.HadoopDelegationTokenProvider
+import org.apache.spark.util.Utils
private[deploy] class HadoopFSDelegationTokenProvider
extends HadoopDelegationTokenProvider with Logging {
@@ -116,10 +117,10 @@ private[deploy] class HadoopFSDelegationTokenProvider
if (fsToExclude.contains(fs.getUri.getHost)) {
// YARN RM skips renewing token with empty renewer
logInfo(s"getting token for: $fs with empty renewer to skip renewal")
- fs.addDelegationTokens("", creds)
+ Utils.tryLogNonFatalError { fs.addDelegationTokens("", creds) }
} else {
logInfo(s"getting token for: $fs with renewer $renewer")
- fs.addDelegationTokens(renewer, creds)
+ Utils.tryLogNonFatalError { fs.addDelegationTokens(renewer, creds) }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]