Repository: spark Updated Branches: refs/heads/branch-1.5 c022c0aa4 -> d31b312fc
[SPARK-11265][YARN] YarnClient can't get tokens to talk to Hive in a secure⦠Backport to branch-1.5 of SPARK-11265 patch. The sole change is in `Client`, where there's no longer a probe to see if the token request is enabled; it always happens. This means that provided there's hive.jar on the classpath, this will attempt to get the token âand if that fails for any reason other than CNFE, the client launch will fail Author: Steve Loughran <[email protected]> Closes #9466 from steveloughran/stevel/patches/SPARK-11265-on-branch-1.5. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d31b312f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d31b312f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d31b312f Branch: refs/heads/branch-1.5 Commit: d31b312fc337c27eba346d3682fdd2efe7ac9a0f Parents: c022c0a Author: Steve Loughran <[email protected]> Authored: Wed Nov 4 11:08:31 2015 -0800 Committer: Marcelo Vanzin <[email protected]> Committed: Wed Nov 4 11:08:31 2015 -0800 ---------------------------------------------------------------------- yarn/pom.xml | 25 +++++++ .../org/apache/spark/deploy/yarn/Client.scala | 58 ++-------------- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 73 ++++++++++++++++++++ .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 30 ++++++++ 4 files changed, 135 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d31b312f/yarn/pom.xml ---------------------------------------------------------------------- diff --git a/yarn/pom.xml b/yarn/pom.xml index fc22d18..cb8874d 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -152,6 +152,31 @@ <artifactId>jersey-server</artifactId> <scope>test</scope> </dependency> + + <!-- + Testing Hive reflection needs hive on the test classpath only. + It doesn't need the spark hive modules, so the -Phive flag is not checked. + --> + <dependency> + <groupId>${hive.group}</groupId> + <artifactId>hive-exec</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${hive.group}</groupId> + <artifactId>hive-metastore</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/spark/blob/d31b312f/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ffa35e5..8ede16e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -272,7 +272,7 @@ private[spark] class Client( // multiple times, YARN will fail to launch containers for the app with an internal // error. val distributedUris = new HashSet[String] - obtainTokenForHiveMetastore(hadoopConf, credentials) + obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) obtainTokenForHBase(hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", @@ -1255,57 +1255,13 @@ object Client extends Logging { /** * Obtains token for the Hive metastore and adds them to the credentials. */ - private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) { + private def obtainTokenForHiveMetastore( + sparkConf: SparkConf, + conf: Configuration, + credentials: Credentials) { if (UserGroupInformation.isSecurityEnabled) { - val mirror = universe.runtimeMirror(getClass.getClassLoader) - - try { - val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive") - val hive = hiveClass.getMethod("get").invoke(null) - - val hiveConf = hiveClass.getMethod("getConf").invoke(hive) - val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") - - val hiveConfGet = (param: String) => Option(hiveConfClass - .getMethod("get", classOf[java.lang.String]) - .invoke(hiveConf, param)) - - val metastore_uri = hiveConfGet("hive.metastore.uris") - - // Check for local metastore - if (metastore_uri != None && metastore_uri.get.toString.size > 0) { - val metastore_kerberos_principal_conf_var = mirror.classLoader - .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars") - .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString - - val principal = hiveConfGet(metastore_kerberos_principal_conf_var) - - val username = Option(UserGroupInformation.getCurrentUser().getUserName) - if (principal != None && username != None) { - val tokenStr = hiveClass.getMethod("getDelegationToken", - classOf[java.lang.String], classOf[java.lang.String]) - .invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String] - - val hive2Token = new Token[DelegationTokenIdentifier]() - hive2Token.decodeFromUrlString(tokenStr) - credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token) - logDebug("Added hive.Server2.delegation.token to conf.") - hiveClass.getMethod("closeCurrent").invoke(null) - } else { - logError("Username or principal == NULL") - logError(s"""username=${username.getOrElse("(NULL)")}""") - logError(s"""principal=${principal.getOrElse("(NULL)")}""") - throw new IllegalArgumentException("username and/or principal is equal to null!") - } - } else { - logDebug("HiveMetaStore configured in localmode") - } - } catch { - case e: java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return } - case e: java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return } - case e: Exception => { logError("Unexpected Exception " + e) - throw new RuntimeException("Unexpected exception", e) - } + YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach { + credentials.addToken(new Text("hive.server2.delegation.token"), _) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/d31b312f/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index d7e7feb..991c909 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -22,14 +22,17 @@ import java.util.regex.Matcher import java.util.regex.Pattern import scala.collection.mutable.HashMap +import scala.reflect.runtime._ import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.{Master, JobConf} import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.Token import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -141,6 +144,76 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) ConverterUtils.toContainerId(containerIdString) } + + /** + * Obtains token for the Hive metastore, using the current user as the principal. + * Some exceptions are caught and downgraded to a log message. + * @param conf hadoop configuration; the Hive configuration will be based on this + * @return a token, or `None` if there's no need for a token (no metastore URI or principal + * in the config), or if a binding exception was caught and downgraded. + */ + def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = { + try { + obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName) + } catch { + case e: ClassNotFoundException => + logInfo(s"Hive class not found $e") + logDebug("Hive class not found", e) + None + } + } + + /** + * Inner routine to obtains token for the Hive metastore; exceptions are raised on any problem. + * @param conf hadoop configuration; the Hive configuration will be based on this. + * @param username the username of the principal requesting the delegating token. + * @return a delegation token + */ + private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration, + username: String): Option[Token[DelegationTokenIdentifier]] = { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + + // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down + // to a Configuration and used without reflection + val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") + // using the (Configuration, Class) constructor allows the current configuratin to be included + // in the hive config. + val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration], + classOf[Object].getClass) + val hiveConf = ctor.newInstance(conf, hiveConfClass).asInstanceOf[Configuration] + val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "") + + // Check for local metastore + if (metastoreUri.nonEmpty) { + require(username.nonEmpty, "Username undefined") + val principalKey = "hive.metastore.kerberos.principal" + val principal = hiveConf.getTrimmed(principalKey, "") + require(principal.nonEmpty, "Hive principal $principalKey undefined") + logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri") + val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive") + val closeCurrent = hiveClass.getMethod("closeCurrent") + try { + // get all the instance methods before invoking any + val getDelegationToken = hiveClass.getMethod("getDelegationToken", + classOf[String], classOf[String]) + val getHive = hiveClass.getMethod("get", hiveConfClass) + + // invoke + val hive = getHive.invoke(null, hiveConf) + val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String] + val hive2Token = new Token[DelegationTokenIdentifier]() + hive2Token.decodeFromUrlString(tokenStr) + Some(hive2Token) + } finally { + Utils.tryLogNonFatalError { + closeCurrent.invoke(null) + } + } + } else { + logDebug("HiveMetaStore configured in localmode") + None + } + } } object YarnSparkHadoopUtil { http://git-wip-us.apache.org/repos/asf/spark/blob/d31b312f/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index e1c67db..9034eab 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -18,10 +18,13 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} +import java.lang.reflect.InvocationTargetException import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.metadata.HiveException +import org.apache.hadoop.io.Text import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -245,4 +248,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging System.clearProperty("SPARK_YARN_MODE") } } + + test("Obtain tokens For HiveMetastore") { + val hadoopConf = new Configuration() + hadoopConf.set("hive.metastore.kerberos.principal", "bob") + // thrift picks up on port 0 and bails out, without trying to talk to endpoint + hadoopConf.set("hive.metastore.uris", "http://localhost:0") + val util = new YarnSparkHadoopUtil + assertNestedHiveException(intercept[InvocationTargetException] { + util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice") + }) + // expect exception trapping code to unwind this hive-side exception + assertNestedHiveException(intercept[InvocationTargetException] { + util.obtainTokenForHiveMetastore(hadoopConf) + }) + } + + private def assertNestedHiveException(e: InvocationTargetException): Throwable = { + val inner = e.getCause + if (inner == null) { + fail("No inner cause", e) + } + if (!inner.isInstanceOf[HiveException]) { + fail("Not a hive exception", inner) + } + inner + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
