Repository: spark Updated Branches: refs/heads/master d3f4a2119 -> cde64add1
[SPARK-21411][YARN] Lazily create FS within kerberized UGI to avoid token acquiring failure ## What changes were proposed in this pull request? In the current `YARNHadoopDelegationTokenManager`, `FileSystem` to which to get tokens are created out of KDC logged UGI, using these `FileSystem` to get new tokens will lead to exception. The main thing is that Spark code trying to get new tokens from the FS created with token auth-ed UGI, but Hadoop can only grant new tokens in kerberized UGI. To fix this issue, we should lazily create these FileSystem within KDC logged UGI. ## How was this patch tested? Manual verification in secure cluster. CC vanzin mgummelt please help to review, thanks! Author: jerryshao <[email protected]> Closes #18633 from jerryshao/SPARK-21411. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cde64add Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cde64add Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cde64add Branch: refs/heads/master Commit: cde64add18dac712c48de0637f1979f1043e333e Parents: d3f4a21 Author: jerryshao <[email protected]> Authored: Tue Jul 18 11:44:01 2017 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Tue Jul 18 11:44:01 2017 -0700 ---------------------------------------------------------------------- .../spark/deploy/security/HadoopDelegationTokenManager.scala | 2 +- .../deploy/security/HadoopFSDelegationTokenProvider.scala | 7 ++++--- .../deploy/security/HadoopDelegationTokenManagerSuite.scala | 8 ++++---- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- .../yarn/security/YARNHadoopDelegationTokenManager.scala | 2 +- .../security/YARNHadoopDelegationTokenManagerSuite.scala | 2 +- 8 files changed, 14 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 89b6f52..01cbfe1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging private[spark] class HadoopDelegationTokenManager( sparkConf: SparkConf, hadoopConf: Configuration, - fileSystems: Set[FileSystem]) + fileSystems: Configuration => Set[FileSystem]) extends Logging { private val deprecatedProviderEnabledConfigs = List( http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 13157f3..f0ac7f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti import org.apache.spark.SparkException import org.apache.spark.internal.Logging -private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSystem]) +private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem]) extends HadoopDelegationTokenProvider with Logging { // This tokenRenewalInterval will be set in the first call to obtainDelegationTokens. @@ -43,13 +43,14 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSyste hadoopConf: Configuration, creds: Credentials): Option[Long] = { + val fsToGetTokens = fileSystems(hadoopConf) val newCreds = fetchDelegationTokens( getTokenRenewer(hadoopConf), - fileSystems) + fsToGetTokens) // Get the token renewal interval if it is not set. It will only be called once. if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems) + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens) } // Get the time of next renewal. http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 335f344..5b05521 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -40,7 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None) delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) @@ -53,7 +53,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None) delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) @@ -66,7 +66,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should be (None) delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None) @@ -77,7 +77,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) val creds = new Credentials() // Tokens cannot be obtained from HDFS, Hive, HBase in unit tests. http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ce290c3..6ff210a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -270,7 +270,7 @@ private[spark] class ApplicationMaster( val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, yarnConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3a7adb7..d408ca9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -125,7 +125,7 @@ private[spark] class Client( private val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index a687f67..4fef439 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -98,7 +98,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager) credentialUpdater.start() } http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala index bbd17c8..163cfb4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.Utils private[yarn] class YARNHadoopDelegationTokenManager( sparkConf: SparkConf, hadoopConf: Configuration, - fileSystems: Set[FileSystem]) extends Logging { + fileSystems: Configuration => Set[FileSystem]) extends Logging { private val delegationTokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems) http://git-wip-us.apache.org/repos/asf/spark/blob/cde64add/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala index 2b226ef..c918998 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala @@ -48,7 +48,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) credentialManager.credentialProviders.get("yarn-test") should not be (None) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
