This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ca45e8  [SPARK-26592][SS] Throw exception when kafka delegation token 
tried to obtain with proxy user
5ca45e8 is described below

commit 5ca45e8a3db75907e8699e7dff63accbff5bfae5
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
AuthorDate: Tue Jan 15 10:00:01 2019 -0800

    [SPARK-26592][SS] Throw exception when kafka delegation token tried to 
obtain with proxy user
    
    ## What changes were proposed in this pull request?
    
    Kafka is not yet support to obtain delegation token with proxy user. It has 
to be turned off until https://issues.apache.org/jira/browse/KAFKA-6945 
implemented.
    
    In this PR an exception will be thrown when this situation happens.
    
    ## How was this patch tested?
    
    Additional unit test.
    
    Closes #23511 from gaborgsomogyi/SPARK-26592.
    
    Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../apache/spark/deploy/security/KafkaTokenUtil.scala | 14 +++++++++++++-
 .../spark/deploy/security/KafkaTokenUtilSuite.scala   | 19 ++++++++++++++++++-
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
index aec0f72..f363853 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.deploy.security
 
-import java.{ util => ju }
+import java.{util => ju}
 import java.text.SimpleDateFormat
 
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
 import org.apache.kafka.clients.CommonClientConfigs
@@ -33,6 +34,7 @@ import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 
 import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 
@@ -45,6 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging {
   }
 
   private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+    checkProxyUser()
+
     val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
     val createDelegationTokenOptions = new CreateDelegationTokenOptions()
     val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
@@ -59,6 +63,14 @@ private[spark] object KafkaTokenUtil extends Logging {
     ), token.tokenInfo.expiryTimestamp)
   }
 
+  private[security] def checkProxyUser(): Unit = {
+    val currentUser = UserGroupInformation.getCurrentUser()
+    // Obtaining delegation token for proxy user is planned but not yet 
implemented
+    // See https://issues.apache.org/jira/browse/KAFKA-6945
+    require(!SparkHadoopUtil.get.isProxyUser(currentUser), "Obtaining 
delegation token for proxy " +
+      "user is not yet supported.")
+  }
+
   private[security] def createAdminClientProperties(sparkConf: SparkConf): 
ju.Properties = {
     val adminClientProperties = new ju.Properties
 
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
index 18aa537..daa7e54 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.deploy.security
 
-import java.{ util => ju }
+import java.{util => ju}
+import java.security.PrivilegedExceptionAction
 import javax.security.auth.login.{AppConfigurationEntry, Configuration}
 
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
@@ -78,6 +80,21 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     Configuration.setConfiguration(null)
   }
 
+  test("checkProxyUser with proxy current user should throw exception") {
+    val realUser = UserGroupInformation.createUserForTesting("realUser", 
Array())
+    UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, 
Array()).doAs(
+      new PrivilegedExceptionAction[Unit]() {
+        override def run(): Unit = {
+          val thrown = intercept[IllegalArgumentException] {
+            KafkaTokenUtil.checkProxyUser()
+          }
+          assert(thrown.getMessage contains
+            "Obtaining delegation token for proxy user is not yet supported.")
+        }
+      }
+    )
+  }
+
   test("createAdminClientProperties without bootstrap servers should throw 
exception") {
     val thrown = intercept[IllegalArgumentException] {
       KafkaTokenUtil.createAdminClientProperties(sparkConf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to