diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala index aec0f72feb3c1..f3638533e1b7d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala @@ -17,12 +17,13 @@ package org.apache.spark.deploy.security -import java.{ util => ju } +import java.{util => ju} import java.text.SimpleDateFormat import scala.util.control.NonFatal import org.apache.hadoop.io.Text +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.kafka.clients.CommonClientConfigs @@ -33,6 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -45,6 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging { } private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { + checkProxyUser() + val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) val createDelegationTokenOptions = new CreateDelegationTokenOptions() val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) @@ -59,6 +63,14 @@ private[spark] object KafkaTokenUtil extends Logging { ), token.tokenInfo.expiryTimestamp) } + private[security] def checkProxyUser(): Unit = { + val currentUser = UserGroupInformation.getCurrentUser() + // Obtaining delegation token for proxy user is planned but not yet implemented + // See https://issues.apache.org/jira/browse/KAFKA-6945 + require(!SparkHadoopUtil.get.isProxyUser(currentUser), "Obtaining delegation token for proxy " + + "user is not yet supported.") + } + private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { val adminClientProperties = new ju.Properties diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala index 18aa537b3a51d..daa7e544cc9c6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.deploy.security -import java.{ util => ju } +import java.{util => ju} +import java.security.PrivilegedExceptionAction import javax.security.auth.login.{AppConfigurationEntry, Configuration} +import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} @@ -78,6 +80,21 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { Configuration.setConfiguration(null) } + test("checkProxyUser with proxy current user should throw exception") { + val realUser = UserGroupInformation.createUserForTesting("realUser", Array()) + UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs( + new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.checkProxyUser() + } + assert(thrown.getMessage contains + "Obtaining delegation token for proxy user is not yet supported.") + } + } + ) + } + test("createAdminClientProperties without bootstrap servers should throw exception") { val thrown = intercept[IllegalArgumentException] { KafkaTokenUtil.createAdminClientProperties(sparkConf)
With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org