This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5ca45e8 [SPARK-26592][SS] Throw exception when kafka delegation token tried to obtain with proxy user 5ca45e8 is described below commit 5ca45e8a3db75907e8699e7dff63accbff5bfae5 Author: Gabor Somogyi <gabor.g.somo...@gmail.com> AuthorDate: Tue Jan 15 10:00:01 2019 -0800 [SPARK-26592][SS] Throw exception when kafka delegation token tried to obtain with proxy user ## What changes were proposed in this pull request? Kafka is not yet support to obtain delegation token with proxy user. It has to be turned off until https://issues.apache.org/jira/browse/KAFKA-6945 implemented. In this PR an exception will be thrown when this situation happens. ## How was this patch tested? Additional unit test. Closes #23511 from gaborgsomogyi/SPARK-26592. Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../apache/spark/deploy/security/KafkaTokenUtil.scala | 14 +++++++++++++- .../spark/deploy/security/KafkaTokenUtilSuite.scala | 19 ++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala index aec0f72..f363853 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala @@ -17,12 +17,13 @@ package org.apache.spark.deploy.security -import java.{ util => ju } +import java.{util => ju} import java.text.SimpleDateFormat import scala.util.control.NonFatal import org.apache.hadoop.io.Text +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.kafka.clients.CommonClientConfigs @@ -33,6 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -45,6 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging { } private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { + checkProxyUser() + val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) val createDelegationTokenOptions = new CreateDelegationTokenOptions() val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) @@ -59,6 +63,14 @@ private[spark] object KafkaTokenUtil extends Logging { ), token.tokenInfo.expiryTimestamp) } + private[security] def checkProxyUser(): Unit = { + val currentUser = UserGroupInformation.getCurrentUser() + // Obtaining delegation token for proxy user is planned but not yet implemented + // See https://issues.apache.org/jira/browse/KAFKA-6945 + require(!SparkHadoopUtil.get.isProxyUser(currentUser), "Obtaining delegation token for proxy " + + "user is not yet supported.") + } + private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { val adminClientProperties = new ju.Properties diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala index 18aa537..daa7e54 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.deploy.security -import java.{ util => ju } +import java.{util => ju} +import java.security.PrivilegedExceptionAction import javax.security.auth.login.{AppConfigurationEntry, Configuration} +import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} @@ -78,6 +80,21 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { Configuration.setConfiguration(null) } + test("checkProxyUser with proxy current user should throw exception") { + val realUser = UserGroupInformation.createUserForTesting("realUser", Array()) + UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs( + new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.checkProxyUser() + } + assert(thrown.getMessage contains + "Obtaining delegation token for proxy user is not yet supported.") + } + } + ) + } + test("createAdminClientProperties without bootstrap servers should throw exception") { val thrown = intercept[IllegalArgumentException] { KafkaTokenUtil.createAdminClientProperties(sparkConf) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org