Repository: spark
Updated Branches:
  refs/heads/branch-1.5 c022c0aa4 -> d31b312fc


[SPARK-11265][YARN] YarnClient can't get tokens to talk to Hive in a secure…

Backport to branch-1.5 of SPARK-11265 patch.

The sole change is in `Client`, where there's no longer a probe to see if the 
token request is enabled; it always happens. This means that provided there's 
hive.jar on the classpath, this will attempt to get the token —and if that 
fails for any reason other than CNFE, the client launch will fail

Author: Steve Loughran <[email protected]>

Closes #9466 from steveloughran/stevel/patches/SPARK-11265-on-branch-1.5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d31b312f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d31b312f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d31b312f

Branch: refs/heads/branch-1.5
Commit: d31b312fc337c27eba346d3682fdd2efe7ac9a0f
Parents: c022c0a
Author: Steve Loughran <[email protected]>
Authored: Wed Nov 4 11:08:31 2015 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Wed Nov 4 11:08:31 2015 -0800

----------------------------------------------------------------------
 yarn/pom.xml                                    | 25 +++++++
 .../org/apache/spark/deploy/yarn/Client.scala   | 58 ++--------------
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 73 ++++++++++++++++++++
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 30 ++++++++
 4 files changed, 135 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d31b312f/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
index fc22d18..cb8874d 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -152,6 +152,31 @@
        <artifactId>jersey-server</artifactId>
        <scope>test</scope>
      </dependency>
+
+    <!--
+     Testing Hive reflection needs hive on the test classpath only.
+     It doesn't need the spark hive modules, so the -Phive flag is not checked.
+      -->
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-exec</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libfb303</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/spark/blob/d31b312f/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index ffa35e5..8ede16e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -272,7 +272,7 @@ private[spark] class Client(
     // multiple times, YARN will fail to launch containers for the app with an 
internal
     // error.
     val distributedUris = new HashSet[String]
-    obtainTokenForHiveMetastore(hadoopConf, credentials)
+    obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
     obtainTokenForHBase(hadoopConf, credentials)
 
     val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
@@ -1255,57 +1255,13 @@ object Client extends Logging {
   /**
    * Obtains token for the Hive metastore and adds them to the credentials.
    */
-  private def obtainTokenForHiveMetastore(conf: Configuration, credentials: 
Credentials) {
+  private def obtainTokenForHiveMetastore(
+      sparkConf: SparkConf,
+      conf: Configuration,
+      credentials: Credentials) {
     if (UserGroupInformation.isSecurityEnabled) {
-      val mirror = universe.runtimeMirror(getClass.getClassLoader)
-
-      try {
-        val hiveClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
-        val hive = hiveClass.getMethod("get").invoke(null)
-
-        val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
-        val hiveConfClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
-
-        val hiveConfGet = (param: String) => Option(hiveConfClass
-          .getMethod("get", classOf[java.lang.String])
-          .invoke(hiveConf, param))
-
-        val metastore_uri = hiveConfGet("hive.metastore.uris")
-
-        // Check for local metastore
-        if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
-          val metastore_kerberos_principal_conf_var = mirror.classLoader
-            .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
-            .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString
-
-          val principal = hiveConfGet(metastore_kerberos_principal_conf_var)
-
-          val username = 
Option(UserGroupInformation.getCurrentUser().getUserName)
-          if (principal != None && username != None) {
-            val tokenStr = hiveClass.getMethod("getDelegationToken",
-              classOf[java.lang.String], classOf[java.lang.String])
-              .invoke(hive, username.get, 
principal.get).asInstanceOf[java.lang.String]
-
-            val hive2Token = new Token[DelegationTokenIdentifier]()
-            hive2Token.decodeFromUrlString(tokenStr)
-            credentials.addToken(new Text("hive.server2.delegation.token"), 
hive2Token)
-            logDebug("Added hive.Server2.delegation.token to conf.")
-            hiveClass.getMethod("closeCurrent").invoke(null)
-          } else {
-            logError("Username or principal == NULL")
-            logError(s"""username=${username.getOrElse("(NULL)")}""")
-            logError(s"""principal=${principal.getOrElse("(NULL)")}""")
-            throw new IllegalArgumentException("username and/or principal is 
equal to null!")
-          }
-        } else {
-          logDebug("HiveMetaStore configured in localmode")
-        }
-      } catch {
-        case e: java.lang.NoSuchMethodException => { logInfo("Hive Method not 
found " + e); return }
-        case e: java.lang.ClassNotFoundException => { logInfo("Hive Class not 
found " + e); return }
-        case e: Exception => { logError("Unexpected Exception " + e)
-          throw new RuntimeException("Unexpected exception", e)
-        }
+      YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
+        credentials.addToken(new Text("hive.server2.delegation.token"), _)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d31b312f/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d7e7feb..991c909 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -22,14 +22,17 @@ import java.util.regex.Matcher
 import java.util.regex.Pattern
 
 import scala.collection.mutable.HashMap
+import scala.reflect.runtime._
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.{Master, JobConf}
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.Token
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -141,6 +144,76 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
     ConverterUtils.toContainerId(containerIdString)
   }
+
+  /**
+   * Obtains token for the Hive metastore, using the current user as the 
principal.
+   * Some exceptions are caught and downgraded to a log message.
+   * @param conf hadoop configuration; the Hive configuration will be based on 
this
+   * @return a token, or `None` if there's no need for a token (no metastore 
URI or principal
+   *         in the config), or if a binding exception was caught and 
downgraded.
+   */
+  def obtainTokenForHiveMetastore(conf: Configuration): 
Option[Token[DelegationTokenIdentifier]] = {
+    try {
+      obtainTokenForHiveMetastoreInner(conf, 
UserGroupInformation.getCurrentUser().getUserName)
+    } catch {
+      case e: ClassNotFoundException =>
+        logInfo(s"Hive class not found $e")
+        logDebug("Hive class not found", e)
+        None
+    }
+  }
+
+  /**
+   * Inner routine to obtains token for the Hive metastore; exceptions are 
raised on any problem.
+   * @param conf hadoop configuration; the Hive configuration will be based on 
this.
+   * @param username the username of the principal requesting the delegating 
token.
+   * @return a delegation token
+   */
+  private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
+      username: String): Option[Token[DelegationTokenIdentifier]] = {
+    val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+
+    // the hive configuration class is a subclass of Hadoop Configuration, so 
can be cast down
+    // to a Configuration and used without reflection
+    val hiveConfClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+    // using the (Configuration, Class) constructor allows the current 
configuratin to be included
+    // in the hive config.
+    val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
+      classOf[Object].getClass)
+    val hiveConf = ctor.newInstance(conf, 
hiveConfClass).asInstanceOf[Configuration]
+    val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "")
+
+    // Check for local metastore
+    if (metastoreUri.nonEmpty) {
+      require(username.nonEmpty, "Username undefined")
+      val principalKey = "hive.metastore.kerberos.principal"
+      val principal = hiveConf.getTrimmed(principalKey, "")
+      require(principal.nonEmpty, "Hive principal $principalKey undefined")
+      logDebug(s"Getting Hive delegation token for $username against 
$principal at $metastoreUri")
+      val hiveClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
+      val closeCurrent = hiveClass.getMethod("closeCurrent")
+      try {
+        // get all the instance methods before invoking any
+        val getDelegationToken = hiveClass.getMethod("getDelegationToken",
+          classOf[String], classOf[String])
+        val getHive = hiveClass.getMethod("get", hiveConfClass)
+
+        // invoke
+        val hive = getHive.invoke(null, hiveConf)
+        val tokenStr = getDelegationToken.invoke(hive, username, 
principal).asInstanceOf[String]
+        val hive2Token = new Token[DelegationTokenIdentifier]()
+        hive2Token.decodeFromUrlString(tokenStr)
+        Some(hive2Token)
+      } finally {
+        Utils.tryLogNonFatalError {
+          closeCurrent.invoke(null)
+        }
+      }
+    } else {
+      logDebug("HiveMetaStore configured in localmode")
+      None
+    }
+  }
 }
 
 object YarnSparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/d31b312f/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index e1c67db..9034eab 100644
--- 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -18,10 +18,13 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.{File, IOException}
+import java.lang.reflect.InvocationTargetException
 
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -245,4 +248,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with 
Matchers with Logging
       System.clearProperty("SPARK_YARN_MODE")
     }
   }
+
+  test("Obtain tokens For HiveMetastore") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("hive.metastore.kerberos.principal", "bob")
+    // thrift picks up on port 0 and bails out, without trying to talk to 
endpoint
+    hadoopConf.set("hive.metastore.uris", "http://localhost:0";)
+    val util = new YarnSparkHadoopUtil
+    assertNestedHiveException(intercept[InvocationTargetException] {
+      util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
+    })
+    // expect exception trapping code to unwind this hive-side exception
+    assertNestedHiveException(intercept[InvocationTargetException] {
+      util.obtainTokenForHiveMetastore(hadoopConf)
+    })
+  }
+
+  private def assertNestedHiveException(e: InvocationTargetException): 
Throwable = {
+    val inner = e.getCause
+    if (inner == null) {
+      fail("No inner cause", e)
+    }
+    if (!inner.isInstanceOf[HiveException]) {
+      fail("Not a hive exception", inner)
+    }
+    inner
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to