http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala deleted file mode 100644 index c4c07b4..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import java.util.ServiceLoader - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.Credentials - -import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -/** - * A ConfigurableCredentialManager to manage all the registered credential providers and offer - * APIs for other modules to obtain credentials as well as renewal time. By default - * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will - * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be - * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]] - * interface and put into resources/META-INF/services to be loaded by ServiceLoader. - * - * Also each credential provider is controlled by - * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false. - */ -private[yarn] final class ConfigurableCredentialManager( - sparkConf: SparkConf, hadoopConf: Configuration) extends Logging { - private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled" - private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled" - - // Maintain all the registered credential providers - private val credentialProviders = { - val providers = ServiceLoader.load(classOf[ServiceCredentialProvider], - Utils.getContextOrSparkClassLoader).asScala - - // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false. - providers.filter { p => - sparkConf.getOption(providerEnabledConfig.format(p.serviceName)) - .orElse { - sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c => - logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " + - s"using ${providerEnabledConfig.format(p.serviceName)} instead") - c - } - }.map(_.toBoolean).getOrElse(true) - }.map { p => (p.serviceName, p) }.toMap - } - - /** - * Get credential provider for the specified service. - */ - def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = { - credentialProviders.get(service) - } - - /** - * Obtain credentials from all the registered providers. - * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable, - * otherwise the nearest renewal time of any credentials will be returned. - */ - def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = { - credentialProviders.values.flatMap { provider => - if (provider.credentialsRequired(hadoopConf)) { - provider.obtainCredentials(hadoopConf, sparkConf, creds) - } else { - logDebug(s"Service ${provider.serviceName} does not require a token." + - s" Check your configuration to see if security is disabled or not.") - None - } - }.foldLeft(Long.MaxValue)(math.min) - } - - /** - * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this - * instance when it is not used. AM will use it to renew credentials periodically. - */ - def credentialRenewer(): AMCredentialRenewer = { - new AMCredentialRenewer(sparkConf, hadoopConf, this) - } - - /** - * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance - * when it is not used. Executors and driver (client mode) will use it to update credentials. - * periodically. - */ - def credentialUpdater(): CredentialUpdater = { - new CredentialUpdater(sparkConf, hadoopConf, this) - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala deleted file mode 100644 index 5df4fbd..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import java.util.concurrent.{Executors, TimeUnit} - -import scala.util.control.NonFatal - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.{Credentials, UserGroupInformation} - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.util.{ThreadUtils, Utils} - -private[spark] class CredentialUpdater( - sparkConf: SparkConf, - hadoopConf: Configuration, - credentialManager: ConfigurableCredentialManager) extends Logging { - - @volatile private var lastCredentialsFileSuffix = 0 - - private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) - private val freshHadoopConf = - SparkHadoopUtil.get.getConfBypassingFSCache( - hadoopConf, new Path(credentialsFile).toUri.getScheme) - - private val credentialUpdater = - Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Credential Refresh Thread")) - - // This thread wakes up and picks up new credentials from HDFS, if any. - private val credentialUpdaterRunnable = - new Runnable { - override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired()) - } - - /** Start the credential updater task */ - def start(): Unit = { - val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME) - val remainingTime = startTime - System.currentTimeMillis() - if (remainingTime <= 0) { - credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES) - } else { - logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime millis.") - credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, TimeUnit.MILLISECONDS) - } - } - - private def updateCredentialsIfRequired(): Unit = { - val timeToNextUpdate = try { - val credentialsFilePath = new Path(credentialsFile) - val remoteFs = FileSystem.get(freshHadoopConf) - SparkHadoopUtil.get.listFilesSorted( - remoteFs, credentialsFilePath.getParent, - credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - .lastOption.map { credentialsStatus => - val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath) - if (suffix > lastCredentialsFileSuffix) { - logInfo("Reading new credentials from " + credentialsStatus.getPath) - val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath) - lastCredentialsFileSuffix = suffix - UserGroupInformation.getCurrentUser.addCredentials(newCredentials) - logInfo("Credentials updated from credentials file.") - - val remainingTime = getTimeOfNextUpdateFromFileName(credentialsStatus.getPath) - - System.currentTimeMillis() - if (remainingTime <= 0) TimeUnit.MINUTES.toMillis(1) else remainingTime - } else { - // If current credential file is older than expected, sleep 1 hour and check again. - TimeUnit.HOURS.toMillis(1) - } - }.getOrElse { - // Wait for 1 minute to check again if there's no credential file currently - TimeUnit.MINUTES.toMillis(1) - } - } catch { - // Since the file may get deleted while we are reading it, catch the Exception and come - // back in an hour to try again - case NonFatal(e) => - logWarning("Error while trying to update credentials, will try again in 1 hour", e) - TimeUnit.HOURS.toMillis(1) - } - - credentialUpdater.schedule( - credentialUpdaterRunnable, timeToNextUpdate, TimeUnit.MILLISECONDS) - } - - private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = { - val stream = remoteFs.open(tokenPath) - try { - val newCredentials = new Credentials() - newCredentials.readTokenStorageStream(stream) - newCredentials - } finally { - stream.close() - } - } - - private def getTimeOfNextUpdateFromFileName(credentialsPath: Path): Long = { - val name = credentialsPath.getName - val index = name.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) - val slice = name.substring(0, index) - val last2index = slice.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) - name.substring(last2index + 1, index).toLong - } - - def stop(): Unit = { - credentialUpdater.shutdown() - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala deleted file mode 100644 index 5571df0..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import scala.reflect.runtime.universe -import scala.util.control.NonFatal - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.Credentials -import org.apache.hadoop.security.token.{Token, TokenIdentifier} - -import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging - -private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging { - - override def serviceName: String = "hbase" - - override def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] = { - try { - val mirror = universe.runtimeMirror(getClass.getClassLoader) - val obtainToken = mirror.classLoader. - loadClass("org.apache.hadoop.hbase.security.token.TokenUtil"). - getMethod("obtainToken", classOf[Configuration]) - - logDebug("Attempting to fetch HBase security token.") - val token = obtainToken.invoke(null, hbaseConf(hadoopConf)) - .asInstanceOf[Token[_ <: TokenIdentifier]] - logInfo(s"Get token from HBase: ${token.toString}") - creds.addToken(token.getService, token) - } catch { - case NonFatal(e) => - logDebug(s"Failed to get token from service $serviceName", e) - } - - None - } - - override def credentialsRequired(hadoopConf: Configuration): Boolean = { - hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos" - } - - private def hbaseConf(conf: Configuration): Configuration = { - try { - val mirror = universe.runtimeMirror(getClass.getClassLoader) - val confCreate = mirror.classLoader. - loadClass("org.apache.hadoop.hbase.HBaseConfiguration"). - getMethod("create", classOf[Configuration]) - confCreate.invoke(null, conf).asInstanceOf[Configuration] - } catch { - case NonFatal(e) => - logDebug("Fail to invoke HBaseConfiguration", e) - conf - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala deleted file mode 100644 index 8d06d73..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import java.io.{ByteArrayInputStream, DataInputStream} - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier -import org.apache.hadoop.mapred.Master -import org.apache.hadoop.security.Credentials - -import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ - -private[security] class HDFSCredentialProvider extends ServiceCredentialProvider with Logging { - // Token renewal interval, this value will be set in the first call, - // if None means no token renewer specified, so cannot get token renewal interval. - private var tokenRenewalInterval: Option[Long] = null - - override val serviceName: String = "hdfs" - - override def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] = { - // NameNode to access, used to get tokens from different FileSystems - nnsToAccess(hadoopConf, sparkConf).foreach { dst => - val dstFs = dst.getFileSystem(hadoopConf) - logInfo("getting token for namenode: " + dst) - dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds) - } - - // Get the token renewal interval if it is not set. It will only be called once. - if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf) - } - - // Get the time of next renewal. - tokenRenewalInterval.map { interval => - creds.getAllTokens.asScala - .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) - .map { t => - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) - identifier.getIssueDate + interval - }.foldLeft(0L)(math.max) - } - } - - private def getTokenRenewalInterval( - hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = { - // We cannot use the tokens generated with renewer yarn. Trying to renew - // those will fail with an access control issue. So create new tokens with the logged in - // user as renewer. - sparkConf.get(PRINCIPAL).map { renewer => - val creds = new Credentials() - nnsToAccess(hadoopConf, sparkConf).foreach { dst => - val dstFs = dst.getFileSystem(hadoopConf) - dstFs.addDelegationTokens(renewer, creds) - } - val t = creds.getAllTokens.asScala - .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) - .head - val newExpiration = t.renew(hadoopConf) - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal Interval is $interval") - interval - } - } - - private def getTokenRenewer(conf: Configuration): String = { - val delegTokenRenewer = Master.getMasterPrincipal(conf) - logDebug("delegation token renewer is: " + delegTokenRenewer) - if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer" - logError(errorMessage) - throw new SparkException(errorMessage) - } - - delegTokenRenewer - } - - private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = { - sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet + - sparkConf.get(STAGING_DIR).map(new Path(_)) - .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala deleted file mode 100644 index 16d8fc3..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import java.lang.reflect.UndeclaredThrowableException -import java.security.PrivilegedExceptionAction - -import scala.reflect.runtime.universe -import scala.util.control.NonFatal - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier -import org.apache.hadoop.io.Text -import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.security.token.Token - -import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -private[security] class HiveCredentialProvider extends ServiceCredentialProvider with Logging { - - override def serviceName: String = "hive" - - private def hiveConf(hadoopConf: Configuration): Configuration = { - try { - val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) - // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down - // to a Configuration and used without reflection - val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") - // using the (Configuration, Class) constructor allows the current configuration to be - // included in the hive config. - val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration], - classOf[Object].getClass) - ctor.newInstance(hadoopConf, hiveConfClass).asInstanceOf[Configuration] - } catch { - case NonFatal(e) => - logDebug("Fail to create Hive Configuration", e) - hadoopConf - } - } - - override def credentialsRequired(hadoopConf: Configuration): Boolean = { - UserGroupInformation.isSecurityEnabled && - hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty - } - - override def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] = { - val conf = hiveConf(hadoopConf) - - val principalKey = "hive.metastore.kerberos.principal" - val principal = conf.getTrimmed(principalKey, "") - require(principal.nonEmpty, s"Hive principal $principalKey undefined") - val metastoreUri = conf.getTrimmed("hive.metastore.uris", "") - require(metastoreUri.nonEmpty, "Hive metastore uri undefined") - - val currentUser = UserGroupInformation.getCurrentUser() - logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} against " + - s"$principal at $metastoreUri") - - val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) - val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive") - val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") - val closeCurrent = hiveClass.getMethod("closeCurrent") - - try { - // get all the instance methods before invoking any - val getDelegationToken = hiveClass.getMethod("getDelegationToken", - classOf[String], classOf[String]) - val getHive = hiveClass.getMethod("get", hiveConfClass) - - doAsRealUser { - val hive = getHive.invoke(null, conf) - val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal) - .asInstanceOf[String] - val hive2Token = new Token[DelegationTokenIdentifier]() - hive2Token.decodeFromUrlString(tokenStr) - logInfo(s"Get Token from hive metastore: ${hive2Token.toString}") - creds.addToken(new Text("hive.server2.delegation.token"), hive2Token) - } - } catch { - case NonFatal(e) => - logDebug(s"Fail to get token from service $serviceName", e) - } finally { - Utils.tryLogNonFatalError { - closeCurrent.invoke(null) - } - } - - None - } - - /** - * Run some code as the real logged in user (which may differ from the current user, for - * example, when using proxying). - */ - private def doAsRealUser[T](fn: => T): T = { - val currentUser = UserGroupInformation.getCurrentUser() - val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) - - // For some reason the Scala-generated anonymous class ends up causing an - // UndeclaredThrowableException, even if you annotate the method with @throws. - try { - realUser.doAs(new PrivilegedExceptionAction[T]() { - override def run(): T = fn - }) - } catch { - case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala deleted file mode 100644 index 4e3fcce..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.{Credentials, UserGroupInformation} - -import org.apache.spark.SparkConf - -/** - * A credential provider for a service. User must implement this if they need to access a - * secure service from Spark. - */ -trait ServiceCredentialProvider { - - /** - * Name of the service to provide credentials. This name should unique, Spark internally will - * use this name to differentiate credential provider. - */ - def serviceName: String - - /** - * To decide whether credential is required for this service. By default it based on whether - * Hadoop security is enabled. - */ - def credentialsRequired(hadoopConf: Configuration): Boolean = { - UserGroupInformation.isSecurityEnabled - } - - /** - * Obtain credentials for this service and get the time of the next renewal. - * @param hadoopConf Configuration of current Hadoop Compatible system. - * @param sparkConf Spark configuration. - * @param creds Credentials to add tokens and security keys to. - * @return If this Credential is renewable and can be renewed, return the time of the next - * renewal, otherwise None should be returned. - */ - def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala b/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala deleted file mode 100644 index 6c3556a..0000000 --- a/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.launcher - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer -import scala.util.Properties - -/** - * Exposes methods from the launcher library that are used by the YARN backend. - */ -private[spark] object YarnCommandBuilderUtils { - - def quoteForBatchScript(arg: String): String = { - CommandBuilderUtils.quoteForBatchScript(arg) - } - - def findJarsDir(sparkHome: String): String = { - val scalaVer = Properties.versionNumberString - .split("\\.") - .take(2) - .mkString(".") - CommandBuilderUtils.findJarsDir(sparkHome, scalaVer, true) - } - - /** - * Adds the perm gen configuration to the list of java options if needed and not yet added. - * - * Note that this method adds the option based on the local JVM version; if the node where - * the container is running has a different Java version, there's a risk that the option will - * not be added (e.g. if the AM is running Java 8 but the container's node is set up to use - * Java 7). - */ - def addPermGenSizeOpt(args: ListBuffer[String]): Unit = { - CommandBuilderUtils.addPermGenSizeOpt(args.asJava) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala deleted file mode 100644 index 4ed2852..0000000 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import java.util.concurrent.atomic.AtomicBoolean - -import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} - -import org.apache.spark.SparkContext -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -/** - * An extension service that can be loaded into a Spark YARN scheduler. - * A Service that can be started and stopped. - * - * 1. For implementations to be loadable by `SchedulerExtensionServices`, - * they must provide an empty constructor. - * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was - * never invoked. - */ -trait SchedulerExtensionService { - - /** - * Start the extension service. This should be a no-op if - * called more than once. - * @param binding binding to the spark application and YARN - */ - def start(binding: SchedulerExtensionServiceBinding): Unit - - /** - * Stop the service - * The `stop()` operation MUST be idempotent, and succeed even if `start()` was - * never invoked. - */ - def stop(): Unit -} - -/** - * Binding information for a [[SchedulerExtensionService]]. - * - * The attempt ID will be set if the service is started within a YARN application master; - * there is then a different attempt ID for every time that AM is restarted. - * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks - * this information. - * @param sparkContext current spark context - * @param applicationId YARN application ID - * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in - * cluster mode. - */ -case class SchedulerExtensionServiceBinding( - sparkContext: SparkContext, - applicationId: ApplicationId, - attemptId: Option[ApplicationAttemptId] = None) - -/** - * Container for [[SchedulerExtensionService]] instances. - * - * Loads Extension Services from the configuration property - * `"spark.yarn.services"`, instantiates and starts them. - * When stopped, it stops all child entries. - * - * The order in which child extension services are started and stopped - * is undefined. - */ -private[spark] class SchedulerExtensionServices extends SchedulerExtensionService - with Logging { - private var serviceOption: Option[String] = None - private var services: List[SchedulerExtensionService] = Nil - private val started = new AtomicBoolean(false) - private var binding: SchedulerExtensionServiceBinding = _ - - /** - * Binding operation will load the named services and call bind on them too; the - * entire set of services are then ready for `init()` and `start()` calls. - * - * @param binding binding to the spark application and YARN - */ - def start(binding: SchedulerExtensionServiceBinding): Unit = { - if (started.getAndSet(true)) { - logWarning("Ignoring re-entrant start operation") - return - } - require(binding.sparkContext != null, "Null context parameter") - require(binding.applicationId != null, "Null appId parameter") - this.binding = binding - val sparkContext = binding.sparkContext - val appId = binding.applicationId - val attemptId = binding.attemptId - logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId") - - services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass => - val instance = Utils.classForName(sClass) - .newInstance() - .asInstanceOf[SchedulerExtensionService] - // bind this service - instance.start(binding) - logInfo(s"Service $sClass started") - instance - }.toList - } - - /** - * Get the list of services. - * - * @return a list of services; Nil until the service is started - */ - def getServices: List[SchedulerExtensionService] = services - - /** - * Stop the services; idempotent. - * - */ - override def stop(): Unit = { - if (started.getAndSet(false)) { - logInfo(s"Stopping $this") - services.foreach { s => - Utils.tryLogNonFatalError(s.stop()) - } - } - } - - override def toString(): String = s"""SchedulerExtensionServices - |(serviceOption=$serviceOption, - | services=$services, - | started=$started)""".stripMargin -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala deleted file mode 100644 index 60da356..0000000 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.yarn.api.records.YarnApplicationState - -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} -import org.apache.spark.internal.Logging -import org.apache.spark.launcher.SparkAppHandle -import org.apache.spark.scheduler.TaskSchedulerImpl - -private[spark] class YarnClientSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext) - extends YarnSchedulerBackend(scheduler, sc) - with Logging { - - private var client: Client = null - private var monitorThread: MonitorThread = null - - /** - * Create a Yarn client to submit an application to the ResourceManager. - * This waits until the application is running. - */ - override def start() { - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - val hostport = driverHost + ":" + driverPort - sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } - - val argsArrayBuf = new ArrayBuffer[String]() - argsArrayBuf += ("--arg", hostport) - - logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) - val args = new ClientArguments(argsArrayBuf.toArray) - totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) - client = new Client(args, conf) - bindToYarn(client.submitApplication(), None) - - // SPARK-8687: Ensure all necessary properties have already been set before - // we initialize our driver scheduler backend, which serves these properties - // to the executors - super.start() - waitForApplication() - - // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver - // reads the credentials from HDFS, just like the executors and updates its own credentials - // cache. - if (conf.contains("spark.yarn.credentials.file")) { - YarnSparkHadoopUtil.get.startCredentialUpdater(conf) - } - monitorThread = asyncMonitorApplication() - monitorThread.start() - } - - /** - * Report the state of the application until it is running. - * If the application has finished, failed or been killed in the process, throw an exception. - * This assumes both `client` and `appId` have already been set. - */ - private def waitForApplication(): Unit = { - assert(client != null && appId.isDefined, "Application has not been submitted yet!") - val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - throw new SparkException("Yarn application has already ended! " + - "It might have been killed or unable to launch application master.") - } - if (state == YarnApplicationState.RUNNING) { - logInfo(s"Application ${appId.get} has started running.") - } - } - - /** - * We create this class for SPARK-9519. Basically when we interrupt the monitor thread it's - * because the SparkContext is being shut down(sc.stop() called by user code), but if - * monitorApplication return, it means the Yarn application finished before sc.stop() was called, - * which means we should call sc.stop() here, and we don't allow the monitor to be interrupted - * before SparkContext stops successfully. - */ - private class MonitorThread extends Thread { - private var allowInterrupt = true - - override def run() { - try { - val (state, _) = client.monitorApplication(appId.get, logApplicationReport = false) - logError(s"Yarn application has already exited with state $state!") - allowInterrupt = false - sc.stop() - } catch { - case e: InterruptedException => logInfo("Interrupting monitor thread") - } - } - - def stopMonitor(): Unit = { - if (allowInterrupt) { - this.interrupt() - } - } - } - - /** - * Monitor the application state in a separate thread. - * If the application has exited for any reason, stop the SparkContext. - * This assumes both `client` and `appId` have already been set. - */ - private def asyncMonitorApplication(): MonitorThread = { - assert(client != null && appId.isDefined, "Application has not been submitted yet!") - val t = new MonitorThread - t.setName("Yarn application state monitor") - t.setDaemon(true) - t - } - - /** - * Stop the scheduler. This assumes `start()` has already been called. - */ - override def stop() { - assert(client != null, "Attempted to stop this scheduler before starting it!") - if (monitorThread != null) { - monitorThread.stopMonitor() - } - - // Report a final state to the launcher if one is connected. This is needed since in client - // mode this backend doesn't let the app monitor loop run to completion, so it does not report - // the final state itself. - // - // Note: there's not enough information at this point to provide a better final state, - // so assume the application was successful. - client.reportLauncherState(SparkAppHandle.State.FINISHED) - - super.stop() - YarnSparkHadoopUtil.get.stopCredentialUpdater() - client.stop() - logInfo("Stopped") - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala deleted file mode 100644 index 64cd1bd..0000000 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} - -/** - * Cluster Manager for creation of Yarn scheduler and backend - */ -private[spark] class YarnClusterManager extends ExternalClusterManager { - - override def canCreate(masterURL: String): Boolean = { - masterURL == "yarn" - } - - override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - sc.deployMode match { - case "cluster" => new YarnClusterScheduler(sc) - case "client" => new YarnScheduler(sc) - case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") - } - } - - override def createSchedulerBackend(sc: SparkContext, - masterURL: String, - scheduler: TaskScheduler): SchedulerBackend = { - sc.deployMode match { - case "cluster" => - new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) - case "client" => - new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) - case _ => - throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") - } - } - - override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { - scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala deleted file mode 100644 index 96c9151..0000000 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark._ -import org.apache.spark.deploy.yarn.ApplicationMaster - -/** - * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of - * ApplicationMaster, etc is done - */ -private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { - - logInfo("Created YarnClusterScheduler") - - override def postStartHook() { - ApplicationMaster.sparkContextInitialized(sc) - super.postStartHook() - logInfo("YarnClusterScheduler.postStartHook done") - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala deleted file mode 100644 index 4f3d5eb..0000000 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.conf.YarnConfiguration - -import org.apache.spark.SparkContext -import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils - -private[spark] class YarnClusterSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext) - extends YarnSchedulerBackend(scheduler, sc) { - - override def start() { - val attemptId = ApplicationMaster.getAttemptId - bindToYarn(attemptId.getApplicationId(), Some(attemptId)) - super.start() - totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf) - } - - override def getDriverLogUrls: Option[Map[String, String]] = { - var driverLogs: Option[Map[String, String]] = None - try { - val yarnConf = new YarnConfiguration(sc.hadoopConfiguration) - val containerId = YarnSparkHadoopUtil.get.getContainerId - - val httpAddress = System.getenv(Environment.NM_HOST.name()) + - ":" + System.getenv(Environment.NM_HTTP_PORT.name()) - // lookup appropriate http scheme for container log urls - val yarnHttpPolicy = yarnConf.get( - YarnConfiguration.YARN_HTTP_POLICY_KEY, - YarnConfiguration.YARN_HTTP_POLICY_DEFAULT - ) - val user = Utils.getCurrentUserName() - val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" - val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user" - logDebug(s"Base URL for logs: $baseUrl") - driverLogs = Some(Map( - "stdout" -> s"$baseUrl/stdout?start=-4096", - "stderr" -> s"$baseUrl/stderr?start=-4096")) - } catch { - case e: Exception => - logInfo("Error while building AM log links, so AM" + - " logs link will not appear in application UI", e) - } - driverLogs - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala deleted file mode 100644 index 0293821..0000000 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.hadoop.yarn.util.RackResolver -import org.apache.log4j.{Level, Logger} - -import org.apache.spark._ -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils - -private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { - - // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. - if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { - Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) - } - - // By default, rack is unknown - override def getRackForHost(hostPort: String): Option[String] = { - val host = Utils.parseHostPort(hostPort)._1 - Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala deleted file mode 100644 index 2f9ea19..0000000 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} -import scala.util.control.NonFatal - -import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} - -import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.rpc._ -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.ui.JettyUtils -import org.apache.spark.util.{RpcUtils, ThreadUtils} - -/** - * Abstract Yarn scheduler backend that contains common logic - * between the client and cluster Yarn scheduler backends. - */ -private[spark] abstract class YarnSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { - - override val minRegisteredRatio = - if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { - 0.8 - } else { - super.minRegisteredRatio - } - - protected var totalExpectedExecutors = 0 - - private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv) - - private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint( - YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint) - - private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf) - - /** Application ID. */ - protected var appId: Option[ApplicationId] = None - - /** Attempt ID. This is unset for client-mode schedulers */ - private var attemptId: Option[ApplicationAttemptId] = None - - /** Scheduler extension services. */ - private val services: SchedulerExtensionServices = new SchedulerExtensionServices() - - // Flag to specify whether this schedulerBackend should be reset. - private var shouldResetOnAmRegister = false - - /** - * Bind to YARN. This *must* be done before calling [[start()]]. - * - * @param appId YARN application ID - * @param attemptId Optional YARN attempt ID - */ - protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = { - this.appId = Some(appId) - this.attemptId = attemptId - } - - override def start() { - require(appId.isDefined, "application ID unset") - val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId) - services.start(binding) - super.start() - } - - override def stop(): Unit = { - try { - // SPARK-12009: To prevent Yarn allocator from requesting backup for the executors which - // was Stopped by SchedulerBackend. - requestTotalExecutors(0, 0, Map.empty) - super.stop() - } finally { - services.stop() - } - } - - /** - * Get the attempt ID for this run, if the cluster manager supports multiple - * attempts. Applications run in client mode will not have attempt IDs. - * This attempt ID only includes attempt counter, like "1", "2". - * - * @return The application attempt id, if available. - */ - override def applicationAttemptId(): Option[String] = { - attemptId.map(_.getAttemptId.toString) - } - - /** - * Get an application ID associated with the job. - * This returns the string value of [[appId]] if set, otherwise - * the locally-generated ID from the superclass. - * @return The application ID - */ - override def applicationId(): String = { - appId.map(_.toString).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } - } - - /** - * Request executors from the ApplicationMaster by specifying the total number desired. - * This includes executors already pending or running. - */ - override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { - yarnSchedulerEndpointRef.ask[Boolean]( - RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)) - } - - /** - * Request that the ApplicationMaster kill the specified executors. - */ - override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { - yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds)) - } - - override def sufficientResourcesRegistered(): Boolean = { - totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio - } - - /** - * Add filters to the SparkUI. - */ - private def addWebUIFilter( - filterName: String, - filterParams: Map[String, String], - proxyBase: String): Unit = { - if (proxyBase != null && proxyBase.nonEmpty) { - System.setProperty("spark.ui.proxyBase", proxyBase) - } - - val hasFilter = - filterName != null && filterName.nonEmpty && - filterParams != null && filterParams.nonEmpty - if (hasFilter) { - logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") - conf.set("spark.ui.filters", filterName) - filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) } - scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) } - } - } - - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new YarnDriverEndpoint(rpcEnv, properties) - } - - /** - * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed - * and re-registered itself to driver after a failure. The stale state in driver should be - * cleaned. - */ - override protected def reset(): Unit = { - super.reset() - sc.executorAllocationManager.foreach(_.reset()) - } - - /** - * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected. - * This endpoint communicates with the executors and queries the AM for an executor's exit - * status when the executor is disconnected. - */ - private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { - - /** - * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint - * handles it by assuming the Executor was lost for a bad reason and removes the executor - * immediately. - * - * In YARN's case however it is crucial to talk to the application master and ask why the - * executor had exited. If the executor exited for some reason unrelated to the running tasks - * (e.g., preemption), according to the application master, then we pass that information down - * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should - * not count towards a job failure. - */ - override def onDisconnected(rpcAddress: RpcAddress): Unit = { - addressToExecutorId.get(rpcAddress).foreach { executorId => - if (disableExecutor(executorId)) { - yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress) - } - } - } - } - - /** - * An [[RpcEndpoint]] that communicates with the ApplicationMaster. - */ - private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv) - extends ThreadSafeRpcEndpoint with Logging { - private var amEndpoint: Option[RpcEndpointRef] = None - - private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver( - executorId: String, - executorRpcAddress: RpcAddress): Unit = { - val removeExecutorMessage = amEndpoint match { - case Some(am) => - val lossReasonRequest = GetExecutorLossReason(executorId) - am.ask[ExecutorLossReason](lossReasonRequest, askTimeout) - .map { reason => RemoveExecutor(executorId, reason) }(ThreadUtils.sameThread) - .recover { - case NonFatal(e) => - logWarning(s"Attempted to get executor loss reason" + - s" for executor id ${executorId} at RPC address ${executorRpcAddress}," + - s" but got no response. Marking as slave lost.", e) - RemoveExecutor(executorId, SlaveLost()) - }(ThreadUtils.sameThread) - case None => - logWarning("Attempted to check for an executor loss reason" + - " before the AM has registered!") - Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered."))) - } - - removeExecutorMessage - .flatMap { message => - driverEndpoint.ask[Boolean](message) - }(ThreadUtils.sameThread) - .onFailure { - case NonFatal(e) => logError( - s"Error requesting driver to remove executor $executorId after disconnection.", e) - }(ThreadUtils.sameThread) - } - - override def receive: PartialFunction[Any, Unit] = { - case RegisterClusterManager(am) => - logInfo(s"ApplicationMaster registered as $am") - amEndpoint = Option(am) - if (!shouldResetOnAmRegister) { - shouldResetOnAmRegister = true - } else { - // AM is already registered before, this potentially means that AM failed and - // a new one registered after the failure. This will only happen in yarn-client mode. - reset() - } - - case AddWebUIFilter(filterName, filterParams, proxyBase) => - addWebUIFilter(filterName, filterParams, proxyBase) - - case r @ RemoveExecutor(executorId, reason) => - logWarning(reason.toString) - driverEndpoint.ask[Boolean](r).onFailure { - case e => - logError("Error requesting driver to remove executor" + - s" $executorId for reason $reason", e) - }(ThreadUtils.sameThread) - } - - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case r: RequestExecutors => - amEndpoint match { - case Some(am) => - am.ask[Boolean](r).andThen { - case Success(b) => context.reply(b) - case Failure(NonFatal(e)) => - logError(s"Sending $r to AM was unsuccessful", e) - context.sendFailure(e) - }(ThreadUtils.sameThread) - case None => - logWarning("Attempted to request executors before the AM has registered!") - context.reply(false) - } - - case k: KillExecutors => - amEndpoint match { - case Some(am) => - am.ask[Boolean](k).andThen { - case Success(b) => context.reply(b) - case Failure(NonFatal(e)) => - logError(s"Sending $k to AM was unsuccessful", e) - context.sendFailure(e) - }(ThreadUtils.sameThread) - case None => - logWarning("Attempted to kill executors before the AM has registered!") - context.reply(false) - } - - case RetrieveLastAllocatedExecutorId => - context.reply(currentExecutorIdCounter) - } - - override def onDisconnected(remoteAddress: RpcAddress): Unit = { - if (amEndpoint.exists(_.address == remoteAddress)) { - logWarning(s"ApplicationMaster has disassociated: $remoteAddress") - amEndpoint = None - } - } - } -} - -private[spark] object YarnSchedulerBackend { - val ENDPOINT_NAME = "YarnScheduler" -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider ---------------------------------------------------------------------- diff --git a/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider deleted file mode 100644 index d0ef5ef..0000000 --- a/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.deploy.yarn.security.TestCredentialProvider http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties deleted file mode 100644 index d13454d..0000000 --- a/yarn/src/test/resources/log4j.properties +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=DEBUG, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from a few verbose libraries. -log4j.logger.com.sun.jersey=WARN -log4j.logger.org.apache.hadoop=WARN -log4j.logger.org.eclipse.jetty=WARN -log4j.logger.org.mortbay=WARN -log4j.logger.org.spark_project.jetty=WARN http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala deleted file mode 100644 index 9c3b18e..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.{File, FileOutputStream, OutputStreamWriter} -import java.nio.charset.StandardCharsets -import java.util.Properties -import java.util.concurrent.TimeUnit - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps - -import com.google.common.io.Files -import org.apache.commons.lang3.SerializationUtils -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.server.MiniYARNCluster -import org.scalatest.{BeforeAndAfterAll, Matchers} -import org.scalatest.concurrent.Eventually._ - -import org.apache.spark._ -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.launcher._ -import org.apache.spark.util.Utils - -abstract class BaseYarnClusterSuite - extends SparkFunSuite with BeforeAndAfterAll with Matchers with Logging { - - // log4j configuration for the YARN containers, so that their output is collected - // by YARN instead of trying to overwrite unit-tests.log. - protected val LOG4J_CONF = """ - |log4j.rootCategory=DEBUG, console - |log4j.appender.console=org.apache.log4j.ConsoleAppender - |log4j.appender.console.target=System.err - |log4j.appender.console.layout=org.apache.log4j.PatternLayout - |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - |log4j.logger.org.apache.hadoop=WARN - |log4j.logger.org.eclipse.jetty=WARN - |log4j.logger.org.mortbay=WARN - |log4j.logger.org.spark_project.jetty=WARN - """.stripMargin - - private var yarnCluster: MiniYARNCluster = _ - protected var tempDir: File = _ - private var fakeSparkJar: File = _ - protected var hadoopConfDir: File = _ - private var logConfDir: File = _ - - var oldSystemProperties: Properties = null - - def newYarnConfig(): YarnConfiguration - - override def beforeAll() { - super.beforeAll() - oldSystemProperties = SerializationUtils.clone(System.getProperties) - - tempDir = Utils.createTempDir() - logConfDir = new File(tempDir, "log4j") - logConfDir.mkdir() - System.setProperty("SPARK_YARN_MODE", "true") - - val logConfFile = new File(logConfDir, "log4j.properties") - Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8) - - // Disable the disk utilization check to avoid the test hanging when people's disks are - // getting full. - val yarnConf = newYarnConfig() - yarnConf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", - "100.0") - - yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) - yarnCluster.init(yarnConf) - yarnCluster.start() - - // There's a race in MiniYARNCluster in which start() may return before the RM has updated - // its address in the configuration. You can see this in the logs by noticing that when - // MiniYARNCluster prints the address, it still has port "0" assigned, although later the - // test works sometimes: - // - // INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0 - // - // That log message prints the contents of the RM_ADDRESS config variable. If you check it - // later on, it looks something like this: - // - // INFO YarnClusterSuite: RM address in configuration is blah:42631 - // - // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't - // done so in a timely manner (defined to be 10 seconds). - val config = yarnCluster.getConfig() - val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10) - while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") { - if (System.currentTimeMillis() > deadline) { - throw new IllegalStateException("Timed out waiting for RM to come up.") - } - logDebug("RM address still not set in configuration, waiting...") - TimeUnit.MILLISECONDS.sleep(100) - } - - logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") - - fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) - hadoopConfDir = new File(tempDir, Client.LOCALIZED_CONF_DIR) - assert(hadoopConfDir.mkdir()) - File.createTempFile("token", ".txt", hadoopConfDir) - } - - override def afterAll() { - try { - yarnCluster.stop() - } finally { - System.setProperties(oldSystemProperties) - super.afterAll() - } - } - - protected def runSpark( - clientMode: Boolean, - klass: String, - appArgs: Seq[String] = Nil, - sparkArgs: Seq[(String, String)] = Nil, - extraClassPath: Seq[String] = Nil, - extraJars: Seq[String] = Nil, - extraConf: Map[String, String] = Map(), - extraEnv: Map[String, String] = Map()): SparkAppHandle.State = { - val deployMode = if (clientMode) "client" else "cluster" - val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf) - val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv - - val launcher = new SparkLauncher(env.asJava) - if (klass.endsWith(".py")) { - launcher.setAppResource(klass) - } else { - launcher.setMainClass(klass) - launcher.setAppResource(fakeSparkJar.getAbsolutePath()) - } - launcher.setSparkHome(sys.props("spark.test.home")) - .setMaster("yarn") - .setDeployMode(deployMode) - .setConf("spark.executor.instances", "1") - .setPropertiesFile(propsFile) - .addAppArgs(appArgs.toArray: _*) - - sparkArgs.foreach { case (name, value) => - if (value != null) { - launcher.addSparkArg(name, value) - } else { - launcher.addSparkArg(name) - } - } - extraJars.foreach(launcher.addJar) - - val handle = launcher.startApplication() - try { - eventually(timeout(2 minutes), interval(1 second)) { - assert(handle.getState().isFinal()) - } - } finally { - handle.kill() - } - - handle.getState() - } - - /** - * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide - * any sort of error when the job process finishes successfully, but the job itself fails. So - * the tests enforce that something is written to a file after everything is ok to indicate - * that the job succeeded. - */ - protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = { - checkResult(finalState, result, "success") - } - - protected def checkResult( - finalState: SparkAppHandle.State, - result: File, - expected: String): Unit = { - finalState should be (SparkAppHandle.State.FINISHED) - val resultString = Files.toString(result, StandardCharsets.UTF_8) - resultString should be (expected) - } - - protected def mainClassName(klass: Class[_]): String = { - klass.getName().stripSuffix("$") - } - - protected def createConfFile( - extraClassPath: Seq[String] = Nil, - extraConf: Map[String, String] = Map()): String = { - val props = new Properties() - props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath()) - - val testClasspath = new TestClasspathBuilder() - .buildClassPath( - logConfDir.getAbsolutePath() + - File.pathSeparator + - extraClassPath.mkString(File.pathSeparator)) - .asScala - .mkString(File.pathSeparator) - - props.put("spark.driver.extraClassPath", testClasspath) - props.put("spark.executor.extraClassPath", testClasspath) - - // SPARK-4267: make sure java options are propagated correctly. - props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"") - props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"") - - yarnCluster.getConfig().asScala.foreach { e => - props.setProperty("spark.hadoop." + e.getKey(), e.getValue()) - } - sys.props.foreach { case (k, v) => - if (k.startsWith("spark.")) { - props.setProperty(k, v) - } - } - extraConf.foreach { case (k, v) => props.setProperty(k, v) } - - val propsFile = File.createTempFile("spark", ".properties", tempDir) - val writer = new OutputStreamWriter(new FileOutputStream(propsFile), StandardCharsets.UTF_8) - props.store(writer, "Spark properties.") - writer.close() - propsFile.getAbsolutePath() - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala deleted file mode 100644 index b696e08..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI - -import scala.collection.mutable.HashMap -import scala.collection.mutable.Map - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileStatus -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path -import org.apache.hadoop.yarn.api.records.LocalResource -import org.apache.hadoop.yarn.api.records.LocalResourceType -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility -import org.apache.hadoop.yarn.util.ConverterUtils -import org.mockito.Mockito.when -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.yarn.config._ - -class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar { - - class MockClientDistributedCacheManager extends ClientDistributedCacheManager { - override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): - LocalResourceVisibility = { - LocalResourceVisibility.PRIVATE - } - } - - test("test getFileStatus empty") { - val distMgr = new ClientDistributedCacheManager() - val fs = mock[FileSystem] - val uri = new URI("/tmp/testing") - when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val stat = distMgr.getFileStatus(fs, uri, statCache) - assert(stat.getPath() === null) - } - - test("test getFileStatus cached") { - val distMgr = new ClientDistributedCacheManager() - val fs = mock[FileSystem] - val uri = new URI("/tmp/testing") - val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus) - val stat = distMgr.getFileStatus(fs, uri, statCache) - assert(stat.getPath().toString() === "/tmp/testing") - } - - test("test addResource") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", - statCache, false) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 0) - assert(resource.getSize() === 0) - assert(resource.getType() === LocalResourceType.FILE) - - val sparkConf = new SparkConf(false) - distMgr.updateConfiguration(sparkConf) - assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link")) - assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L)) - assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L)) - assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name())) - assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name())) - - // add another one and verify both there and order correct - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing2")) - val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2") - when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus) - distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", - statCache, false) - val resource2 = localResources("link2") - assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2) - assert(resource2.getTimestamp() === 10) - assert(resource2.getSize() === 20) - assert(resource2.getType() === LocalResourceType.FILE) - - val sparkConf2 = new SparkConf(false) - distMgr.updateConfiguration(sparkConf2) - - val files = sparkConf2.get(CACHED_FILES) - val sizes = sparkConf2.get(CACHED_FILES_SIZES) - val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS) - val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES) - - assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(timestamps(0) === 0) - assert(sizes(0) === 0) - assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name()) - - assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2") - assert(timestamps(1) === 10) - assert(sizes(1) === 20) - assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name()) - } - - test("test addResource link null") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) - - intercept[Exception] { - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, - statCache, false) - } - assert(localResources.get("link") === None) - assert(localResources.size === 0) - } - - test("test addResource appmaster only") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", - statCache, true) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 10) - assert(resource.getSize() === 20) - assert(resource.getType() === LocalResourceType.ARCHIVE) - - val sparkConf = new SparkConf(false) - distMgr.updateConfiguration(sparkConf) - assert(sparkConf.get(CACHED_FILES) === Nil) - assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil) - assert(sparkConf.get(CACHED_FILES_SIZES) === Nil) - assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil) - assert(sparkConf.get(CACHED_FILES_TYPES) === Nil) - } - - test("test addResource archive") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", - statCache, false) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 10) - assert(resource.getSize() === 20) - assert(resource.getType() === LocalResourceType.ARCHIVE) - - val sparkConf = new SparkConf(false) - distMgr.updateConfiguration(sparkConf) - assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link")) - assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L)) - assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L)) - assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name())) - assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name())) - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
