http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala new file mode 100644 index 0000000..be419ce --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -0,0 +1,1541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.{File, FileOutputStream, IOException, OutputStreamWriter} +import java.net.{InetAddress, UnknownHostException, URI} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.{Properties, UUID} +import java.util.zip.{ZipEntry, ZipOutputStream} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import com.google.common.base.Objects +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.mapreduce.MRJobConfig +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.util.StringUtils +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.api.protocolrecords._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException +import org.apache.hadoop.yarn.util.Records + +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} +import org.apache.spark.util.{CallerContext, Utils} + +private[spark] class Client( + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) + extends Logging { + + import Client._ + import YarnSparkHadoopUtil._ + + def this(clientArgs: ClientArguments, spConf: SparkConf) = + this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) + + private val yarnClient = YarnClient.createYarnClient + private val yarnConf = new YarnConfiguration(hadoopConf) + + private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + + // AM related configurations + private val amMemory = if (isClusterMode) { + sparkConf.get(DRIVER_MEMORY).toInt + } else { + sparkConf.get(AM_MEMORY).toInt + } + private val amMemoryOverhead = { + val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD + sparkConf.get(amMemoryOverheadEntry).getOrElse( + math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + } + private val amCores = if (isClusterMode) { + sparkConf.get(DRIVER_CORES) + } else { + sparkConf.get(AM_CORES) + } + + // Executor related configurations + private val executorMemory = sparkConf.get(EXECUTOR_MEMORY) + private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + + private val distCacheMgr = new ClientDistributedCacheManager() + + private var loginFromKeytab = false + private var principal: String = null + private var keytab: String = null + private var credentials: Credentials = null + + private val launcherBackend = new LauncherBackend() { + override def onStopRequest(): Unit = { + if (isClusterMode && appId != null) { + yarnClient.killApplication(appId) + } else { + setState(SparkAppHandle.State.KILLED) + stop() + } + } + } + private val fireAndForget = isClusterMode && !sparkConf.get(WAIT_FOR_APP_COMPLETION) + + private var appId: ApplicationId = null + + // The app staging dir based on the STAGING_DIR configuration if configured + // otherwise based on the users home directory. + private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) } + .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) + + private val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + + def reportLauncherState(state: SparkAppHandle.State): Unit = { + launcherBackend.setState(state) + } + + def stop(): Unit = { + launcherBackend.close() + yarnClient.stop() + // Unset YARN mode system env variable, to allow switching between cluster types. + System.clearProperty("SPARK_YARN_MODE") + } + + /** + * Submit an application running our ApplicationMaster to the ResourceManager. + * + * The stable Yarn API provides a convenience method (YarnClient#createApplication) for + * creating applications and setting up the application submission context. This was not + * available in the alpha API. + */ + def submitApplication(): ApplicationId = { + var appId: ApplicationId = null + try { + launcherBackend.connect() + // Setup the credentials before doing anything else, + // so we have don't have issues at any point. + setupCredentials() + yarnClient.init(yarnConf) + yarnClient.start() + + logInfo("Requesting a new application from cluster with %d NodeManagers" + .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) + + // Get a new application from our RM + val newApp = yarnClient.createApplication() + val newAppResponse = newApp.getNewApplicationResponse() + appId = newAppResponse.getApplicationId() + reportLauncherState(SparkAppHandle.State.SUBMITTED) + launcherBackend.setAppId(appId.toString) + + new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), + Option(appId.toString)).setCurrentContext() + + // Verify whether the cluster has enough resources for our AM + verifyClusterResources(newAppResponse) + + // Set up the appropriate contexts to launch our AM + val containerContext = createContainerLaunchContext(newAppResponse) + val appContext = createApplicationSubmissionContext(newApp, containerContext) + + // Finally, submit and monitor the application + logInfo(s"Submitting application $appId to ResourceManager") + yarnClient.submitApplication(appContext) + appId + } catch { + case e: Throwable => + if (appId != null) { + cleanupStagingDir(appId) + } + throw e + } + } + + /** + * Cleanup application staging directory. + */ + private def cleanupStagingDir(appId: ApplicationId): Unit = { + val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) + try { + val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) + val fs = stagingDirPath.getFileSystem(hadoopConf) + if (!preserveFiles && fs.delete(stagingDirPath, true)) { + logInfo(s"Deleted staging directory $stagingDirPath") + } + } catch { + case ioe: IOException => + logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + } + + /** + * Set up the context for submitting our ApplicationMaster. + * This uses the YarnClientApplication not available in the Yarn alpha API. + */ + def createApplicationSubmissionContext( + newApp: YarnClientApplication, + containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { + val appContext = newApp.getApplicationSubmissionContext + appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark")) + appContext.setQueue(sparkConf.get(QUEUE_NAME)) + appContext.setAMContainerSpec(containerContext) + appContext.setApplicationType("SPARK") + + sparkConf.get(APPLICATION_TAGS).foreach { tags => + try { + // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use + // reflection to set it, printing a warning if a tag was specified but the YARN version + // doesn't support it. + val method = appContext.getClass().getMethod( + "setApplicationTags", classOf[java.util.Set[String]]) + method.invoke(appContext, new java.util.HashSet[String](tags.asJava)) + } catch { + case e: NoSuchMethodException => + logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " + + "YARN does not support it") + } + } + sparkConf.get(MAX_APP_ATTEMPTS) match { + case Some(v) => appContext.setMaxAppAttempts(v) + case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " + + "Cluster's default value will be used.") + } + + sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval => + try { + val method = appContext.getClass().getMethod( + "setAttemptFailuresValidityInterval", classOf[Long]) + method.invoke(appContext, interval: java.lang.Long) + } catch { + case e: NoSuchMethodException => + logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " + + "the version of YARN does not support it") + } + } + + val capability = Records.newRecord(classOf[Resource]) + capability.setMemory(amMemory + amMemoryOverhead) + capability.setVirtualCores(amCores) + + sparkConf.get(AM_NODE_LABEL_EXPRESSION) match { + case Some(expr) => + try { + val amRequest = Records.newRecord(classOf[ResourceRequest]) + amRequest.setResourceName(ResourceRequest.ANY) + amRequest.setPriority(Priority.newInstance(0)) + amRequest.setCapability(capability) + amRequest.setNumContainers(1) + val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String]) + method.invoke(amRequest, expr) + + val setResourceRequestMethod = + appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest]) + setResourceRequestMethod.invoke(appContext, amRequest) + } catch { + case e: NoSuchMethodException => + logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " + + "of YARN does not support it") + appContext.setResource(capability) + } + case None => + appContext.setResource(capability) + } + + sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern => + try { + val logAggregationContext = Records.newRecord( + Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) + .asInstanceOf[Object] + + val setRolledLogsIncludePatternMethod = + logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String]) + setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern) + + sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => + val setRolledLogsExcludePatternMethod = + logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String]) + setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern) + } + + val setLogAggregationContextMethod = + appContext.getClass.getMethod("setLogAggregationContext", + Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) + setLogAggregationContextMethod.invoke(appContext, logAggregationContext) + } catch { + case NonFatal(e) => + logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " + + s"does not support it", e) + } + } + + appContext + } + + /** Set up security tokens for launching our ApplicationMaster container. */ + private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { + val dob = new DataOutputBuffer + credentials.writeTokenStorageToStream(dob) + amContainer.setTokens(ByteBuffer.wrap(dob.getData)) + } + + /** Get the application report from the ResourceManager for an application we have submitted. */ + def getApplicationReport(appId: ApplicationId): ApplicationReport = + yarnClient.getApplicationReport(appId) + + /** + * Return the security token used by this client to communicate with the ApplicationMaster. + * If no security is enabled, the token returned by the report is null. + */ + private def getClientToken(report: ApplicationReport): String = + Option(report.getClientToAMToken).map(_.toString).getOrElse("") + + /** + * Fail fast if we have requested more resources per container than is available in the cluster. + */ + private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = { + val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() + logInfo("Verifying our application has not requested more than the maximum " + + s"memory capability of the cluster ($maxMem MB per container)") + val executorMem = executorMemory + executorMemoryOverhead + if (executorMem > maxMem) { + throw new IllegalArgumentException(s"Required executor memory ($executorMemory" + + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + + "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " + + "'yarn.nodemanager.resource.memory-mb'.") + } + val amMem = amMemory + amMemoryOverhead + if (amMem > maxMem) { + throw new IllegalArgumentException(s"Required AM memory ($amMemory" + + s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + + "Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.") + } + logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( + amMem, + amMemoryOverhead)) + + // We could add checks to make sure the entire cluster has enough resources but that involves + // getting all the node reports and computing ourselves. + } + + /** + * Copy the given file to a remote file system (e.g. HDFS) if needed. + * The file is only copied if the source and destination file systems are different. This is used + * for preparing resources for launching the ApplicationMaster container. Exposed for testing. + */ + private[yarn] def copyFileToRemote( + destDir: Path, + srcPath: Path, + replication: Short, + force: Boolean = false, + destName: Option[String] = None): Path = { + val destFs = destDir.getFileSystem(hadoopConf) + val srcFs = srcPath.getFileSystem(hadoopConf) + var destPath = srcPath + if (force || !compareFs(srcFs, destFs)) { + destPath = new Path(destDir, destName.getOrElse(srcPath.getName())) + logInfo(s"Uploading resource $srcPath -> $destPath") + FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) + destFs.setReplication(destPath, replication) + destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION)) + } else { + logInfo(s"Source and destination file systems are the same. Not copying $srcPath") + } + // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific + // version shows the specific version in the distributed cache configuration + val qualifiedDestPath = destFs.makeQualified(destPath) + val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf) + fc.resolvePath(qualifiedDestPath) + } + + /** + * Upload any resources to the distributed cache if needed. If a resource is intended to be + * consumed locally, set up the appropriate config for downstream code to handle it properly. + * This is used for setting up a container launch context for our ApplicationMaster. + * Exposed for testing. + */ + def prepareLocalResources( + destDir: Path, + pySparkArchives: Seq[String]): HashMap[String, LocalResource] = { + logInfo("Preparing resources for our AM container") + // Upload Spark and the application JAR to the remote file system if necessary, + // and add them as local resources to the application master. + val fs = destDir.getFileSystem(hadoopConf) + + // Merge credentials obtained from registered providers + val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials) + + if (credentials != null) { + logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n")) + } + + // If we use principal and keytab to login, also credentials can be renewed some time + // after current time, we should pass the next renewal and updating time to credential + // renewer and updater. + if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() && + nearestTimeOfNextRenewal != Long.MaxValue) { + + // Valid renewal time is 75% of next renewal time, and the valid update time will be + // slightly later then renewal time (80% of next renewal time). This is to make sure + // credentials are renewed and updated before expired. + val currTime = System.currentTimeMillis() + val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime + val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime + + sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong) + sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong) + } + + // Used to keep track of URIs added to the distributed cache. If the same URI is added + // multiple times, YARN will fail to launch containers for the app with an internal + // error. + val distributedUris = new HashSet[String] + // Used to keep track of URIs(files) added to the distribute cache have the same name. If + // same name but different path files are added multiple time, YARN will fail to launch + // containers for the app with an internal error. + val distributedNames = new HashSet[String] + + val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) + .getOrElse(fs.getDefaultReplication(destDir)) + val localResources = HashMap[String, LocalResource]() + FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION)) + + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + + def addDistributedUri(uri: URI): Boolean = { + val uriStr = uri.toString() + val fileName = new File(uri.getPath).getName + if (distributedUris.contains(uriStr)) { + logWarning(s"Same path resource $uri added multiple times to distributed cache.") + false + } else if (distributedNames.contains(fileName)) { + logWarning(s"Same name resource $uri added multiple times to distributed cache") + false + } else { + distributedUris += uriStr + distributedNames += fileName + true + } + } + + /** + * Distribute a file to the cluster. + * + * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied + * to HDFS (if not already there) and added to the application's distributed cache. + * + * @param path URI of the file to distribute. + * @param resType Type of resource being distributed. + * @param destName Name of the file in the distributed cache. + * @param targetDir Subdirectory where to place the file. + * @param appMasterOnly Whether to distribute only to the AM. + * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the + * localized path for non-local paths, or the input `path` for local paths. + * The localized path will be null if the URI has already been added to the cache. + */ + def distribute( + path: String, + resType: LocalResourceType = LocalResourceType.FILE, + destName: Option[String] = None, + targetDir: Option[String] = None, + appMasterOnly: Boolean = false): (Boolean, String) = { + val trimmedPath = path.trim() + val localURI = Utils.resolveURI(trimmedPath) + if (localURI.getScheme != LOCAL_SCHEME) { + if (addDistributedUri(localURI)) { + val localPath = getQualifiedLocalPath(localURI, hadoopConf) + val linkname = targetDir.map(_ + "/").getOrElse("") + + destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) + val destPath = copyFileToRemote(destDir, localPath, replication) + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) + distCacheMgr.addResource( + destFs, hadoopConf, destPath, localResources, resType, linkname, statCache, + appMasterOnly = appMasterOnly) + (false, linkname) + } else { + (false, null) + } + } else { + (true, trimmedPath) + } + } + + // If we passed in a keytab, make sure we copy the keytab to the staging directory on + // HDFS, and setup the relevant environment vars, so the AM can login again. + if (loginFromKeytab) { + logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + + " via the YARN Secure Distributed Cache.") + val (_, localizedPath) = distribute(keytab, + destName = sparkConf.get(KEYTAB), + appMasterOnly = true) + require(localizedPath != null, "Keytab file already distributed.") + } + + /** + * Add Spark to the cache. There are two settings that control what files to add to the cache: + * - if a Spark archive is defined, use the archive. The archive is expected to contain + * jar files at its root directory. + * - if a list of jars is provided, filter the non-local ones, resolve globs, and + * add the found files to the cache. + * + * Note that the archive cannot be a "local" URI. If none of the above settings are found, + * then upload all files found in $SPARK_HOME/jars. + */ + val sparkArchive = sparkConf.get(SPARK_ARCHIVE) + if (sparkArchive.isDefined) { + val archive = sparkArchive.get + require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.") + distribute(Utils.resolveURI(archive).toString, + resType = LocalResourceType.ARCHIVE, + destName = Some(LOCALIZED_LIB_DIR)) + } else { + sparkConf.get(SPARK_JARS) match { + case Some(jars) => + // Break the list of jars to upload, and resolve globs. + val localJars = new ArrayBuffer[String]() + jars.foreach { jar => + if (!isLocalUri(jar)) { + val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf) + val pathFs = FileSystem.get(path.toUri(), hadoopConf) + pathFs.globStatus(path).filter(_.isFile()).foreach { entry => + distribute(entry.getPath().toUri().toString(), + targetDir = Some(LOCALIZED_LIB_DIR)) + } + } else { + localJars += jar + } + } + + // Propagate the local URIs to the containers using the configuration. + sparkConf.set(SPARK_JARS, localJars) + + case None => + // No configuration, so fall back to uploading local jar files. + logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " + + "to uploading libraries under SPARK_HOME.") + val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir( + sparkConf.getenv("SPARK_HOME"))) + val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip", + new File(Utils.getLocalDir(sparkConf))) + val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive)) + + try { + jarsStream.setLevel(0) + jarsDir.listFiles().foreach { f => + if (f.isFile && f.getName.toLowerCase().endsWith(".jar") && f.canRead) { + jarsStream.putNextEntry(new ZipEntry(f.getName)) + Files.copy(f, jarsStream) + jarsStream.closeEntry() + } + } + } finally { + jarsStream.close() + } + + distribute(jarsArchive.toURI.getPath, + resType = LocalResourceType.ARCHIVE, + destName = Some(LOCALIZED_LIB_DIR)) + } + } + + /** + * Copy user jar to the distributed cache if their scheme is not "local". + * Otherwise, set the corresponding key in our SparkConf to handle it downstream. + */ + Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar => + val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME)) + if (isLocal) { + require(localizedPath != null, s"Path $jar already distributed") + // If the resource is intended for local use only, handle this downstream + // by setting the appropriate property + sparkConf.set(APP_JAR, localizedPath) + } + } + + /** + * Do the same for any additional resources passed in through ClientArguments. + * Each resource category is represented by a 3-tuple of: + * (1) comma separated list of resources in this category, + * (2) resource type, and + * (3) whether to add these resources to the classpath + */ + val cachedSecondaryJarLinks = ListBuffer.empty[String] + List( + (sparkConf.get(JARS_TO_DISTRIBUTE), LocalResourceType.FILE, true), + (sparkConf.get(FILES_TO_DISTRIBUTE), LocalResourceType.FILE, false), + (sparkConf.get(ARCHIVES_TO_DISTRIBUTE), LocalResourceType.ARCHIVE, false) + ).foreach { case (flist, resType, addToClasspath) => + flist.foreach { file => + val (_, localizedPath) = distribute(file, resType = resType) + // If addToClassPath, we ignore adding jar multiple times to distitrbuted cache. + if (addToClasspath) { + if (localizedPath != null) { + cachedSecondaryJarLinks += localizedPath + } + } else { + if (localizedPath == null) { + throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" + + " to the distributed cache.") + } + } + } + } + if (cachedSecondaryJarLinks.nonEmpty) { + sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks) + } + + if (isClusterMode && args.primaryPyFile != null) { + distribute(args.primaryPyFile, appMasterOnly = true) + } + + pySparkArchives.foreach { f => distribute(f) } + + // The python files list needs to be treated especially. All files that are not an + // archive need to be placed in a subdirectory that will be added to PYTHONPATH. + sparkConf.get(PY_FILES).foreach { f => + val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None + distribute(f, targetDir = targetDir) + } + + // Update the configuration with all the distributed files, minus the conf archive. The + // conf archive will be handled by the AM differently so that we avoid having to send + // this configuration by other means. See SPARK-14602 for one reason of why this is needed. + distCacheMgr.updateConfiguration(sparkConf) + + // Upload the conf archive to HDFS manually, and record its location in the configuration. + // This will allow the AM to know where the conf archive is in HDFS, so that it can be + // distributed to the containers. + // + // This code forces the archive to be copied, so that unit tests pass (since in that case both + // file systems are the same and the archive wouldn't normally be copied). In most (all?) + // deployments, the archive would be copied anyway, since it's a temp file in the local file + // system. + val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE) + val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf) + sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString()) + + val localConfArchive = new Path(createConfArchive().toURI()) + copyFileToRemote(destDir, localConfArchive, replication, force = true, + destName = Some(LOCALIZED_CONF_ARCHIVE)) + + // Manually add the config archive to the cache manager so that the AM is launched with + // the proper files set up. + distCacheMgr.addResource( + remoteFs, hadoopConf, remoteConfArchivePath, localResources, LocalResourceType.ARCHIVE, + LOCALIZED_CONF_DIR, statCache, appMasterOnly = false) + + // Clear the cache-related entries from the configuration to avoid them polluting the + // UI's environment page. This works for client mode; for cluster mode, this is handled + // by the AM. + CACHE_CONFIGS.foreach(sparkConf.remove) + + localResources + } + + /** + * Create an archive with the config files for distribution. + * + * These will be used by AM and executors. The files are zipped and added to the job as an + * archive, so that YARN will explode it when distributing to AM and executors. This directory + * is then added to the classpath of AM and executor process, just to make sure that everybody + * is using the same default config. + * + * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR + * shows up in the classpath before YARN_CONF_DIR. + * + * Currently this makes a shallow copy of the conf directory. If there are cases where a + * Hadoop config directory contains subdirectories, this code will have to be fixed. + * + * The archive also contains some Spark configuration. Namely, it saves the contents of + * SparkConf in a file to be loaded by the AM process. + */ + private def createConfArchive(): File = { + val hadoopConfFiles = new HashMap[String, File]() + + // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that + // the executors will use the latest configurations instead of the default values. This is + // required when user changes log4j.properties directly to set the log configurations. If + // configuration file is provided through --files then executors will be taking configurations + // from --files instead of $SPARK_CONF_DIR/log4j.properties. + + // Also uploading metrics.properties to distributed cache if exists in classpath. + // If user specify this file using --files then executors will use the one + // from --files instead. + for { prop <- Seq("log4j.properties", "metrics.properties") + url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) + if url.getProtocol == "file" } { + hadoopConfFiles(prop) = new File(url.getPath) + } + + Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => + sys.env.get(envKey).foreach { path => + val dir = new File(path) + if (dir.isDirectory()) { + val files = dir.listFiles() + if (files == null) { + logWarning("Failed to list files under directory " + dir) + } else { + files.foreach { file => + if (file.isFile && !hadoopConfFiles.contains(file.getName())) { + hadoopConfFiles(file.getName()) = file + } + } + } + } + } + } + + val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip", + new File(Utils.getLocalDir(sparkConf))) + val confStream = new ZipOutputStream(new FileOutputStream(confArchive)) + + try { + confStream.setLevel(0) + hadoopConfFiles.foreach { case (name, file) => + if (file.canRead()) { + confStream.putNextEntry(new ZipEntry(name)) + Files.copy(file, confStream) + confStream.closeEntry() + } + } + + // Save Spark configuration to a file in the archive. + val props = new Properties() + sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } + confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE)) + val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8) + props.store(writer, "Spark configuration.") + writer.flush() + confStream.closeEntry() + } finally { + confStream.close() + } + confArchive + } + + /** + * Set up the environment for launching our ApplicationMaster container. + */ + private def setupLaunchEnv( + stagingDirPath: Path, + pySparkArchives: Seq[String]): HashMap[String, String] = { + logInfo("Setting up the launch environment for our AM container") + val env = new HashMap[String, String]() + populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) + env("SPARK_YARN_MODE") = "true" + env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString + env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() + if (loginFromKeytab) { + val credentialsFile = "credentials-" + UUID.randomUUID().toString + sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) + logInfo(s"Credentials file set to: $credentialsFile") + } + + // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* + val amEnvPrefix = "spark.yarn.appMasterEnv." + sparkConf.getAll + .filter { case (k, v) => k.startsWith(amEnvPrefix) } + .map { case (k, v) => (k.substring(amEnvPrefix.length), v) } + .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) } + + // Keep this for backwards compatibility but users should move to the config + sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => + // Allow users to specify some environment variables. + YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) + // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. + env("SPARK_YARN_USER_ENV") = userEnvs + } + + // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH + // of the container processes too. Add all non-.py files directly to PYTHONPATH. + // + // NOTE: the code currently does not handle .py files defined with a "local:" scheme. + val pythonPath = new ListBuffer[String]() + val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py")) + if (pyFiles.nonEmpty) { + pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + LOCALIZED_PYTHON_DIR) + } + (pySparkArchives ++ pyArchives).foreach { path => + val uri = Utils.resolveURI(path) + if (uri.getScheme != LOCAL_SCHEME) { + pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + new Path(uri).getName()) + } else { + pythonPath += uri.getPath() + } + } + + // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors. + if (pythonPath.nonEmpty) { + val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath) + .mkString(YarnSparkHadoopUtil.getClassPathSeparator) + env("PYTHONPATH") = pythonPathStr + sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr) + } + + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to + // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's + // SparkContext will not let that set spark* system properties, which is expected behavior for + // Yarn clients. So propagate it through the environment. + // + // Note that to warn the user about the deprecation in cluster mode, some code from + // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition + // described above). + if (isClusterMode) { + sys.env.get("SPARK_JAVA_OPTS").foreach { value => + val warning = + s""" + |SPARK_JAVA_OPTS was detected (set to '$value'). + |This is deprecated in Spark 1.0+. + | + |Please instead use: + | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application + | - ./spark-submit with --driver-java-options to set -X options for a driver + | - spark.executor.extraJavaOptions to set -X options for executors + """.stripMargin + logWarning(warning) + for (proc <- Seq("driver", "executor")) { + val key = s"spark.$proc.extraJavaOptions" + if (sparkConf.contains(key)) { + throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.") + } + } + env("SPARK_JAVA_OPTS") = value + } + // propagate PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON to driver in cluster mode + Seq("PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON").foreach { envname => + if (!env.contains(envname)) { + sys.env.get(envname).foreach(env(envname) = _) + } + } + } + + sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp => + env(ENV_DIST_CLASSPATH) = dcp + } + + env + } + + /** + * Set up a ContainerLaunchContext to launch our ApplicationMaster container. + * This sets up the launch environment, java options, and the command for launching the AM. + */ + private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) + : ContainerLaunchContext = { + logInfo("Setting up container launch context for our AM") + val appId = newAppResponse.getApplicationId + val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) + val pySparkArchives = + if (sparkConf.get(IS_PYTHON_APP)) { + findPySparkArchives() + } else { + Nil + } + val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) + val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) + + val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) + amContainer.setLocalResources(localResources.asJava) + amContainer.setEnvironment(launchEnv.asJava) + + val javaOpts = ListBuffer[String]() + + // Set the environment variable through a command prefix + // to append to the existing value of the variable + var prefixEnv: Option[String] = None + + // Add Xmx for AM memory + javaOpts += "-Xmx" + amMemory + "m" + + val tmpDir = new Path( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR + ) + javaOpts += "-Djava.io.tmpdir=" + tmpDir + + // TODO: Remove once cpuset version is pushed out. + // The context is, default gc for server class machines ends up using all cores to do gc - + // hence if there are multiple containers in same node, Spark GC affects all other containers' + // performance (which can be that of other Spark containers) + // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in + // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset + // of cores on a node. + val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean) + if (useConcurrentAndIncrementalGC) { + // In our expts, using (default) throughput collector has severe perf ramifications in + // multi-tenant machines + javaOpts += "-XX:+UseConcMarkSweepGC" + javaOpts += "-XX:MaxTenuringThreshold=31" + javaOpts += "-XX:SurvivorRatio=8" + javaOpts += "-XX:+CMSIncrementalMode" + javaOpts += "-XX:+CMSIncrementalPacing" + javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" + javaOpts += "-XX:CMSIncrementalDutyCycle=10" + } + + // Include driver-specific java options if we are launching a driver + if (isClusterMode) { + val driverOpts = sparkConf.get(DRIVER_JAVA_OPTIONS).orElse(sys.env.get("SPARK_JAVA_OPTS")) + driverOpts.foreach { opts => + javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) + } + val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH), + sys.props.get("spark.driver.libraryPath")).flatten + if (libraryPaths.nonEmpty) { + prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths))) + } + if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) { + logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode") + } + } else { + // Validate and include yarn am specific java options in yarn-client mode. + sparkConf.get(AM_JAVA_OPTIONS).foreach { opts => + if (opts.contains("-Dspark")) { + val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to set Spark options (was '$opts')." + throw new SparkException(msg) + } + if (opts.contains("-Xmx")) { + val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to specify max heap memory settings " + + s"(was '$opts'). Use spark.yarn.am.memory instead." + throw new SparkException(msg) + } + javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) + } + sparkConf.get(AM_LIBRARY_PATH).foreach { paths => + prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths)))) + } + } + + // For log4j configuration to reference + javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) + YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts) + + val userClass = + if (isClusterMode) { + Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) + } else { + Nil + } + val userJar = + if (args.userJar != null) { + Seq("--jar", args.userJar) + } else { + Nil + } + val primaryPyFile = + if (isClusterMode && args.primaryPyFile != null) { + Seq("--primary-py-file", new Path(args.primaryPyFile).getName()) + } else { + Nil + } + val primaryRFile = + if (args.primaryRFile != null) { + Seq("--primary-r-file", args.primaryRFile) + } else { + Nil + } + val amClass = + if (isClusterMode) { + Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName + } else { + Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName + } + if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { + args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs + } + val userArgs = args.userArgs.flatMap { arg => + Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) + } + val amArgs = + Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ + userArgs ++ Seq( + "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) + + // Command for the ApplicationMaster + val commands = prefixEnv ++ Seq( + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" + ) ++ + javaOpts ++ amArgs ++ + Seq( + "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", + "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") + + // TODO: it would be nicer to just make sure there are no null commands here + val printableCommands = commands.map(s => if (s == null) "null" else s).toList + amContainer.setCommands(printableCommands.asJava) + + logDebug("===============================================================================") + logDebug("YARN AM launch context:") + logDebug(s" user class: ${Option(args.userClass).getOrElse("N/A")}") + logDebug(" env:") + launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") } + logDebug(" resources:") + localResources.foreach { case (k, v) => logDebug(s" $k -> $v")} + logDebug(" command:") + logDebug(s" ${printableCommands.mkString(" ")}") + logDebug("===============================================================================") + + // send the acl settings into YARN to control who has access via YARN interfaces + val securityManager = new SecurityManager(sparkConf) + amContainer.setApplicationACLs( + YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) + setupSecurityToken(amContainer) + amContainer + } + + def setupCredentials(): Unit = { + loginFromKeytab = sparkConf.contains(PRINCIPAL.key) + if (loginFromKeytab) { + principal = sparkConf.get(PRINCIPAL).get + keytab = sparkConf.get(KEYTAB).orNull + + require(keytab != null, "Keytab must be specified when principal is specified.") + logInfo("Attempting to login to the Kerberos" + + s" using principal: $principal and keytab: $keytab") + val f = new File(keytab) + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + val keytabFileName = f.getName + "-" + UUID.randomUUID().toString + sparkConf.set(KEYTAB.key, keytabFileName) + sparkConf.set(PRINCIPAL.key, principal) + } + // Defensive copy of the credentials + credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) + } + + /** + * Report the state of an application until it has exited, either successfully or + * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED, + * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED, + * or KILLED). + * + * @param appId ID of the application to monitor. + * @param returnOnRunning Whether to also return the application state when it is RUNNING. + * @param logApplicationReport Whether to log details of the application report every iteration. + * @return A pair of the yarn application state and the final application state. + */ + def monitorApplication( + appId: ApplicationId, + returnOnRunning: Boolean = false, + logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { + val interval = sparkConf.get(REPORT_INTERVAL) + var lastState: YarnApplicationState = null + while (true) { + Thread.sleep(interval) + val report: ApplicationReport = + try { + getApplicationReport(appId) + } catch { + case e: ApplicationNotFoundException => + logError(s"Application $appId not found.") + cleanupStagingDir(appId) + return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) + case NonFatal(e) => + logError(s"Failed to contact YARN for application $appId.", e) + // Don't necessarily clean up staging dir because status is unknown + return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED) + } + val state = report.getYarnApplicationState + + if (logApplicationReport) { + logInfo(s"Application report for $appId (state: $state)") + + // If DEBUG is enabled, log report details every iteration + // Otherwise, log them every time the application changes state + if (log.isDebugEnabled) { + logDebug(formatReportDetails(report)) + } else if (lastState != state) { + logInfo(formatReportDetails(report)) + } + } + + if (lastState != state) { + state match { + case YarnApplicationState.RUNNING => + reportLauncherState(SparkAppHandle.State.RUNNING) + case YarnApplicationState.FINISHED => + report.getFinalApplicationStatus match { + case FinalApplicationStatus.FAILED => + reportLauncherState(SparkAppHandle.State.FAILED) + case FinalApplicationStatus.KILLED => + reportLauncherState(SparkAppHandle.State.KILLED) + case _ => + reportLauncherState(SparkAppHandle.State.FINISHED) + } + case YarnApplicationState.FAILED => + reportLauncherState(SparkAppHandle.State.FAILED) + case YarnApplicationState.KILLED => + reportLauncherState(SparkAppHandle.State.KILLED) + case _ => + } + } + + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + cleanupStagingDir(appId) + return (state, report.getFinalApplicationStatus) + } + + if (returnOnRunning && state == YarnApplicationState.RUNNING) { + return (state, report.getFinalApplicationStatus) + } + + lastState = state + } + + // Never reached, but keeps compiler happy + throw new SparkException("While loop is depleted! This should never happen...") + } + + private def formatReportDetails(report: ApplicationReport): String = { + val details = Seq[(String, String)]( + ("client token", getClientToken(report)), + ("diagnostics", report.getDiagnostics), + ("ApplicationMaster host", report.getHost), + ("ApplicationMaster RPC port", report.getRpcPort.toString), + ("queue", report.getQueue), + ("start time", report.getStartTime.toString), + ("final status", report.getFinalApplicationStatus.toString), + ("tracking URL", report.getTrackingUrl), + ("user", report.getUser) + ) + + // Use more loggable format if value is null or empty + details.map { case (k, v) => + val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") + s"\n\t $k: $newValue" + }.mkString("") + } + + /** + * Submit an application to the ResourceManager. + * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive + * reporting the application's status until the application has exited for any reason. + * Otherwise, the client process will exit after submission. + * If the application finishes with a failed, killed, or undefined status, + * throw an appropriate SparkException. + */ + def run(): Unit = { + this.appId = submitApplication() + if (!launcherBackend.isConnected() && fireAndForget) { + val report = getApplicationReport(appId) + val state = report.getYarnApplicationState + logInfo(s"Application report for $appId (state: $state)") + logInfo(formatReportDetails(report)) + if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { + throw new SparkException(s"Application $appId finished with status: $state") + } + } else { + val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId) + if (yarnApplicationState == YarnApplicationState.FAILED || + finalApplicationStatus == FinalApplicationStatus.FAILED) { + throw new SparkException(s"Application $appId finished with failed status") + } + if (yarnApplicationState == YarnApplicationState.KILLED || + finalApplicationStatus == FinalApplicationStatus.KILLED) { + throw new SparkException(s"Application $appId is killed") + } + if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) { + throw new SparkException(s"The final status of application $appId is undefined") + } + } + } + + private def findPySparkArchives(): Seq[String] = { + sys.env.get("PYSPARK_ARCHIVES_PATH") + .map(_.split(",").toSeq) + .getOrElse { + val pyLibPath = Seq(sys.env("SPARK_HOME"), "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + require(pyArchivesFile.exists(), + s"$pyArchivesFile not found; cannot run pyspark application in YARN mode.") + val py4jFile = new File(pyLibPath, "py4j-0.10.4-src.zip") + require(py4jFile.exists(), + s"$py4jFile not found; cannot run pyspark application in YARN mode.") + Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath()) + } + } + +} + +private object Client extends Logging { + + def main(argStrings: Array[String]) { + if (!sys.props.contains("SPARK_SUBMIT")) { + logWarning("WARNING: This client is deprecated and will be removed in a " + + "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") + } + + // Set an env variable indicating we are running in YARN mode. + // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes + System.setProperty("SPARK_YARN_MODE", "true") + val sparkConf = new SparkConf + // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, + // so remove them from sparkConf here for yarn mode. + sparkConf.remove("spark.jars") + sparkConf.remove("spark.files") + val args = new ClientArguments(argStrings) + new Client(args, sparkConf).run() + } + + // Alias for the user jar + val APP_JAR_NAME: String = "__app__.jar" + + // URI scheme that identifies local resources + val LOCAL_SCHEME = "local" + + // Staging directory for any temporary jars or files + val SPARK_STAGING: String = ".sparkStaging" + + + // Staging directory is private! -> rwx-------- + val STAGING_DIR_PERMISSION: FsPermission = + FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) + + // App files are world-wide readable and owner writable -> rw-r--r-- + val APP_FILE_PERMISSION: FsPermission = + FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) + + // Distribution-defined classpath to add to processes + val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH" + + // Subdirectory where the user's Spark and Hadoop config files will be placed. + val LOCALIZED_CONF_DIR = "__spark_conf__" + + // File containing the conf archive in the AM. See prepareLocalResources(). + val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip" + + // Name of the file in the conf archive containing Spark configuration. + val SPARK_CONF_FILE = "__spark_conf__.properties" + + // Subdirectory where the user's python files (not archives) will be placed. + val LOCALIZED_PYTHON_DIR = "__pyfiles__" + + // Subdirectory where Spark libraries will be placed. + val LOCALIZED_LIB_DIR = "__spark_libs__" + + /** + * Return the path to the given application's staging directory. + */ + private def getAppStagingDir(appId: ApplicationId): String = { + buildPath(SPARK_STAGING, appId.toString()) + } + + /** + * Populate the classpath entry in the given environment map with any application + * classpath specified through the Hadoop and Yarn configurations. + */ + private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) + : Unit = { + val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) + for (c <- classPathElementsToAdd.flatten) { + YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) + } + } + + private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = + Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match { + case Some(s) => Some(s.toSeq) + case None => getDefaultYarnApplicationClasspath + } + + private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] = + Option(conf.getStrings("mapreduce.application.classpath")) match { + case Some(s) => Some(s.toSeq) + case None => getDefaultMRApplicationClasspath + } + + private[yarn] def getDefaultYarnApplicationClasspath: Option[Seq[String]] = { + val triedDefault = Try[Seq[String]] { + val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") + val value = field.get(null).asInstanceOf[Array[String]] + value.toSeq + } recoverWith { + case e: NoSuchFieldException => Success(Seq.empty[String]) + } + + triedDefault match { + case f: Failure[_] => + logError("Unable to obtain the default YARN Application classpath.", f.exception) + case s: Success[Seq[String]] => + logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}") + } + + triedDefault.toOption + } + + private[yarn] def getDefaultMRApplicationClasspath: Option[Seq[String]] = { + val triedDefault = Try[Seq[String]] { + val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH") + StringUtils.getStrings(field.get(null).asInstanceOf[String]).toSeq + } recoverWith { + case e: NoSuchFieldException => Success(Seq.empty[String]) + } + + triedDefault match { + case f: Failure[_] => + logError("Unable to obtain the default MR Application classpath.", f.exception) + case s: Success[Seq[String]] => + logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}") + } + + triedDefault.toOption + } + + /** + * Populate the classpath entry in the given environment map. + * + * User jars are generally not added to the JVM's system classpath; those are handled by the AM + * and executor backend. When the deprecated `spark.yarn.user.classpath.first` is used, user jars + * are included in the system classpath, though. The extra class path and other uploaded files are + * always made available through the system class path. + * + * @param args Client arguments (when starting the AM) or null (when starting executors). + */ + private[yarn] def populateClasspath( + args: ClientArguments, + conf: Configuration, + sparkConf: SparkConf, + env: HashMap[String, String], + extraClassPath: Option[String] = None): Unit = { + extraClassPath.foreach { cp => + addClasspathEntry(getClusterPath(sparkConf, cp), env) + } + + addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env) + + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + + LOCALIZED_CONF_DIR, env) + + if (sparkConf.get(USER_CLASS_PATH_FIRST)) { + // in order to properly add the app jar when user classpath is first + // we have to do the mainJar separate in order to send the right thing + // into addFileToClasspath + val mainJar = + if (args != null) { + getMainJarUri(Option(args.userJar)) + } else { + getMainJarUri(sparkConf.get(APP_JAR)) + } + mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR_NAME, env)) + + val secondaryJars = + if (args != null) { + getSecondaryJarUris(Option(sparkConf.get(JARS_TO_DISTRIBUTE))) + } else { + getSecondaryJarUris(sparkConf.get(SECONDARY_JARS)) + } + secondaryJars.foreach { x => + addFileToClasspath(sparkConf, conf, x, null, env) + } + } + + // Add the Spark jars to the classpath, depending on how they were distributed. + addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + LOCALIZED_LIB_DIR, "*"), env) + if (!sparkConf.get(SPARK_ARCHIVE).isDefined) { + sparkConf.get(SPARK_JARS).foreach { jars => + jars.filter(isLocalUri).foreach { jar => + addClasspathEntry(getClusterPath(sparkConf, jar), env) + } + } + } + + populateHadoopClasspath(conf, env) + sys.env.get(ENV_DIST_CLASSPATH).foreach { cp => + addClasspathEntry(getClusterPath(sparkConf, cp), env) + } + } + + /** + * Returns a list of URIs representing the user classpath. + * + * @param conf Spark configuration. + */ + def getUserClasspath(conf: SparkConf): Array[URI] = { + val mainUri = getMainJarUri(conf.get(APP_JAR)) + val secondaryUris = getSecondaryJarUris(conf.get(SECONDARY_JARS)) + (mainUri ++ secondaryUris).toArray + } + + private def getMainJarUri(mainJar: Option[String]): Option[URI] = { + mainJar.flatMap { path => + val uri = Utils.resolveURI(path) + if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None + }.orElse(Some(new URI(APP_JAR_NAME))) + } + + private def getSecondaryJarUris(secondaryJars: Option[Seq[String]]): Seq[URI] = { + secondaryJars.getOrElse(Nil).map(new URI(_)) + } + + /** + * Adds the given path to the classpath, handling "local:" URIs correctly. + * + * If an alternate name for the file is given, and it's not a "local:" file, the alternate + * name will be added to the classpath (relative to the job's work directory). + * + * If not a "local:" file and no alternate name, the linkName will be added to the classpath. + * + * @param conf Spark configuration. + * @param hadoopConf Hadoop configuration. + * @param uri URI to add to classpath (optional). + * @param fileName Alternate name for the file (optional). + * @param env Map holding the environment variables. + */ + private def addFileToClasspath( + conf: SparkConf, + hadoopConf: Configuration, + uri: URI, + fileName: String, + env: HashMap[String, String]): Unit = { + if (uri != null && uri.getScheme == LOCAL_SCHEME) { + addClasspathEntry(getClusterPath(conf, uri.getPath), env) + } else if (fileName != null) { + addClasspathEntry(buildPath( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) + } else if (uri != null) { + val localPath = getQualifiedLocalPath(uri, hadoopConf) + val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) + addClasspathEntry(buildPath( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env) + } + } + + /** + * Add the given path to the classpath entry of the given environment map. + * If the classpath is already set, this appends the new path to the existing classpath. + */ + private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit = + YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path) + + /** + * Returns the path to be sent to the NM for a path that is valid on the gateway. + * + * This method uses two configuration values: + * + * - spark.yarn.config.gatewayPath: a string that identifies a portion of the input path that may + * only be valid in the gateway node. + * - spark.yarn.config.replacementPath: a string with which to replace the gateway path. This may + * contain, for example, env variable references, which will be expanded by the NMs when + * starting containers. + * + * If either config is not available, the input path is returned. + */ + def getClusterPath(conf: SparkConf, path: String): String = { + val localPath = conf.get(GATEWAY_ROOT_PATH) + val clusterPath = conf.get(REPLACEMENT_ROOT_PATH) + if (localPath != null && clusterPath != null) { + path.replace(localPath, clusterPath) + } else { + path + } + } + + /** + * Return whether the two file systems are the same. + */ + private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { + val srcUri = srcFs.getUri() + val dstUri = destFs.getUri() + if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) { + return false + } + + var srcHost = srcUri.getHost() + var dstHost = dstUri.getHost() + + // In HA or when using viewfs, the host part of the URI may not actually be a host, but the + // name of the HDFS namespace. Those names won't resolve, so avoid even trying if they + // match. + if (srcHost != null && dstHost != null && srcHost != dstHost) { + try { + srcHost = InetAddress.getByName(srcHost).getCanonicalHostName() + dstHost = InetAddress.getByName(dstHost).getCanonicalHostName() + } catch { + case e: UnknownHostException => + return false + } + } + + Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort() + } + + /** + * Given a local URI, resolve it and return a qualified local path that corresponds to the URI. + * This is used for preparing local resources to be included in the container launch context. + */ + private def getQualifiedLocalPath(localURI: URI, hadoopConf: Configuration): Path = { + val qualifiedURI = + if (localURI.getScheme == null) { + // If not specified, assume this is in the local filesystem to keep the behavior + // consistent with that of Hadoop + new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString) + } else { + localURI + } + new Path(qualifiedURI) + } + + /** + * Whether to consider jars provided by the user to have precedence over the Spark jars when + * loading user classes. + */ + def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = { + if (isDriver) { + conf.get(DRIVER_USER_CLASS_PATH_FIRST) + } else { + conf.get(EXECUTOR_USER_CLASS_PATH_FIRST) + } + } + + /** + * Joins all the path components using Path.SEPARATOR. + */ + def buildPath(components: String*): String = { + components.mkString(Path.SEPARATOR) + } + + /** Returns whether the URI is a "local:" URI. */ + def isLocalUri(uri: String): Boolean = { + uri.startsWith(s"$LOCAL_SCHEME:") + } + +}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala new file mode 100644 index 0000000..61c027e --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! +private[spark] class ClientArguments(args: Array[String]) { + + var userJar: String = null + var userClass: String = null + var primaryPyFile: String = null + var primaryRFile: String = null + var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]() + + parseArgs(args.toList) + + private def parseArgs(inputArgs: List[String]): Unit = { + var args = inputArgs + + while (!args.isEmpty) { + args match { + case ("--jar") :: value :: tail => + userJar = value + args = tail + + case ("--class") :: value :: tail => + userClass = value + args = tail + + case ("--primary-py-file") :: value :: tail => + primaryPyFile = value + args = tail + + case ("--primary-r-file") :: value :: tail => + primaryRFile = value + args = tail + + case ("--arg") :: value :: tail => + userArgs += value + args = tail + + case Nil => + + case _ => + throw new IllegalArgumentException(getUsageMessage(args)) + } + } + + if (primaryPyFile != null && primaryRFile != null) { + throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" + + " at the same time") + } + } + + private def getUsageMessage(unknownParam: List[String] = null): String = { + val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" + message + + s""" + |Usage: org.apache.spark.deploy.yarn.Client [options] + |Options: + | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster + | mode) + | --class CLASS_NAME Name of your application's main class (required) + | --primary-py-file A main Python file + | --primary-r-file A main R file + | --arg ARG Argument to be passed to your application's main class. + | Multiple invocations are possible, each will be passed in order. + """.stripMargin + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala new file mode 100644 index 0000000..dcc2288 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.URI + +import scala.collection.mutable.{HashMap, ListBuffer, Map} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.util.{ConverterUtils, Records} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.Logging + +private case class CacheEntry( + uri: URI, + size: Long, + modTime: Long, + visibility: LocalResourceVisibility, + resType: LocalResourceType) + +/** Client side methods to setup the Hadoop distributed cache */ +private[spark] class ClientDistributedCacheManager() extends Logging { + + private val distCacheEntries = new ListBuffer[CacheEntry]() + + /** + * Add a resource to the list of distributed cache resources. This list can + * be sent to the ApplicationMaster and possibly the executors so that it can + * be downloaded into the Hadoop distributed cache for use by this application. + * Adds the LocalResource to the localResources HashMap passed in and saves + * the stats of the resources to they can be sent to the executors and verified. + * + * @param fs FileSystem + * @param conf Configuration + * @param destPath path to the resource + * @param localResources localResource hashMap to insert the resource into + * @param resourceType LocalResourceType + * @param link link presented in the distributed cache to the destination + * @param statCache cache to store the file/directory stats + * @param appMasterOnly Whether to only add the resource to the app master + */ + def addResource( + fs: FileSystem, + conf: Configuration, + destPath: Path, + localResources: HashMap[String, LocalResource], + resourceType: LocalResourceType, + link: String, + statCache: Map[URI, FileStatus], + appMasterOnly: Boolean = false): Unit = { + val destStatus = fs.getFileStatus(destPath) + val amJarRsrc = Records.newRecord(classOf[LocalResource]) + amJarRsrc.setType(resourceType) + val visibility = getVisibility(conf, destPath.toUri(), statCache) + amJarRsrc.setVisibility(visibility) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath)) + amJarRsrc.setTimestamp(destStatus.getModificationTime()) + amJarRsrc.setSize(destStatus.getLen()) + require(link != null && link.nonEmpty, "You must specify a valid link name.") + localResources(link) = amJarRsrc + + if (!appMasterOnly) { + val uri = destPath.toUri() + val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link) + distCacheEntries += CacheEntry(pathURI, destStatus.getLen(), destStatus.getModificationTime(), + visibility, resourceType) + } + } + + /** + * Writes down information about cached files needed in executors to the given configuration. + */ + def updateConfiguration(conf: SparkConf): Unit = { + conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString)) + conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size)) + conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime)) + conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name())) + conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name())) + } + + /** + * Returns the local resource visibility depending on the cache file permissions + * @return LocalResourceVisibility + */ + private[yarn] def getVisibility( + conf: Configuration, + uri: URI, + statCache: Map[URI, FileStatus]): LocalResourceVisibility = { + if (isPublic(conf, uri, statCache)) { + LocalResourceVisibility.PUBLIC + } else { + LocalResourceVisibility.PRIVATE + } + } + + /** + * Returns a boolean to denote whether a cache file is visible to all (public) + * @return true if the path in the uri is visible to all, false otherwise + */ + private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = { + val fs = FileSystem.get(uri, conf) + val current = new Path(uri.getPath()) + // the leaf level file should be readable by others + if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { + return false + } + ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) + } + + /** + * Returns true if all ancestors of the specified path have the 'execute' + * permission set for all users (i.e. that other users can traverse + * the directory hierarchy to the given path) + * @return true if all ancestors have the 'execute' permission set for all users + */ + private def ancestorsHaveExecutePermissions( + fs: FileSystem, + path: Path, + statCache: Map[URI, FileStatus]): Boolean = { + var current = path + while (current != null) { + // the subdirs in the path should have execute permissions for others + if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { + return false + } + current = current.getParent() + } + true + } + + /** + * Checks for a given path whether the Other permissions on it + * imply the permission in the passed FsAction + * @return true if the path in the uri is visible to all, false otherwise + */ + private def checkPermissionOfOther( + fs: FileSystem, + path: Path, + action: FsAction, + statCache: Map[URI, FileStatus]): Boolean = { + val status = getFileStatus(fs, path.toUri(), statCache) + val perms = status.getPermission() + val otherAction = perms.getOtherAction() + otherAction.implies(action) + } + + /** + * Checks to see if the given uri exists in the cache, if it does it + * returns the existing FileStatus, otherwise it stats the uri, stores + * it in the cache, and returns the FileStatus. + * @return FileStatus + */ + private[yarn] def getFileStatus( + fs: FileSystem, + uri: URI, + statCache: Map[URI, FileStatus]): FileStatus = { + val stat = statCache.get(uri) match { + case Some(existstat) => existstat + case None => + val newStat = fs.getFileStatus(new Path(uri)) + statCache.put(uri, newStat) + newStat + } + stat + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala new file mode 100644 index 0000000..868c2ed --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.File +import java.nio.ByteBuffer +import java.util.Collections + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.NMClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.ipc.YarnRPC +import org.apache.hadoop.yarn.util.{ConverterUtils, Records} + +import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.YarnCommandBuilderUtils +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +private[yarn] class ExecutorRunnable( + container: Option[Container], + conf: YarnConfiguration, + sparkConf: SparkConf, + masterAddress: String, + executorId: String, + hostname: String, + executorMemory: Int, + executorCores: Int, + appId: String, + securityMgr: SecurityManager, + localResources: Map[String, LocalResource]) extends Logging { + + var rpc: YarnRPC = YarnRPC.create(conf) + var nmClient: NMClient = _ + + def run(): Unit = { + logDebug("Starting Executor Container") + nmClient = NMClient.createNMClient() + nmClient.init(conf) + nmClient.start() + startContainer() + } + + def launchContextDebugInfo(): String = { + val commands = prepareCommand() + val env = prepareEnvironment() + + s""" + |=============================================================================== + |YARN executor launch context: + | env: + |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString} + | command: + | ${commands.mkString(" \\ \n ")} + | + | resources: + |${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString} + |===============================================================================""".stripMargin + } + + def startContainer(): java.util.Map[String, ByteBuffer] = { + val ctx = Records.newRecord(classOf[ContainerLaunchContext]) + .asInstanceOf[ContainerLaunchContext] + val env = prepareEnvironment().asJava + + ctx.setLocalResources(localResources.asJava) + ctx.setEnvironment(env) + + val credentials = UserGroupInformation.getCurrentUser().getCredentials() + val dob = new DataOutputBuffer() + credentials.writeTokenStorageToStream(dob) + ctx.setTokens(ByteBuffer.wrap(dob.getData())) + + val commands = prepareCommand() + + ctx.setCommands(commands.asJava) + ctx.setApplicationACLs( + YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava) + + // If external shuffle service is enabled, register with the Yarn shuffle service already + // started on the NodeManager and, if authentication is enabled, provide it with our secret + // key for fetching shuffle files later + if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { + val secretString = securityMgr.getSecretKey() + val secretBytes = + if (secretString != null) { + // This conversion must match how the YarnShuffleService decodes our secret + JavaUtils.stringToBytes(secretString) + } else { + // Authentication is not enabled, so just provide dummy metadata + ByteBuffer.allocate(0) + } + ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) + } + + // Send the start request to the ContainerManager + try { + nmClient.startContainer(container.get, ctx) + } catch { + case ex: Exception => + throw new SparkException(s"Exception while starting container ${container.get.getId}" + + s" on host $hostname", ex) + } + } + + private def prepareCommand(): List[String] = { + // Extra options for the JVM + val javaOpts = ListBuffer[String]() + + // Set the environment variable through a command prefix + // to append to the existing value of the variable + var prefixEnv: Option[String] = None + + // Set the JVM memory + val executorMemoryString = executorMemory + "m" + javaOpts += "-Xmx" + executorMemoryString + + // Set extra Java options for the executor, if defined + sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts => + javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) + } + sys.env.get("SPARK_JAVA_OPTS").foreach { opts => + javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) + } + sparkConf.get(EXECUTOR_LIBRARY_PATH).foreach { p => + prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p)))) + } + + javaOpts += "-Djava.io.tmpdir=" + + new Path( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR + ) + + // Certain configs need to be passed here because they are needed before the Executor + // registers with the Scheduler and transfers the spark configs. Since the Executor backend + // uses RPC to connect to the scheduler, the RPC settings are needed as well as the + // authentication settings. + sparkConf.getAll + .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) } + .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } + + // Commenting it out for now - so that people can refer to the properties if required. Remove + // it once cpuset version is pushed out. + // The context is, default gc for server class machines end up using all cores to do gc - hence + // if there are multiple containers in same node, spark gc effects all other containers + // performance (which can also be other spark containers) + // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in + // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset + // of cores on a node. + /* + else { + // If no java_opts specified, default to using -XX:+CMSIncrementalMode + // It might be possible that other modes/config is being done in + // spark.executor.extraJavaOptions, so we don't want to mess with it. + // In our expts, using (default) throughput collector has severe perf ramifications in + // multi-tenant machines + // The options are based on + // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use + // %20the%20Concurrent%20Low%20Pause%20Collector|outline + javaOpts += "-XX:+UseConcMarkSweepGC" + javaOpts += "-XX:+CMSIncrementalMode" + javaOpts += "-XX:+CMSIncrementalPacing" + javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" + javaOpts += "-XX:CMSIncrementalDutyCycle=10" + } + */ + + // For log4j configuration to reference + javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) + YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts) + + val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => + val absPath = + if (new File(uri.getPath()).isAbsolute()) { + Client.getClusterPath(sparkConf, uri.getPath()) + } else { + Client.buildPath(Environment.PWD.$(), uri.getPath()) + } + Seq("--user-class-path", "file:" + absPath) + }.toSeq + + YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) + val commands = prefixEnv ++ Seq( + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", + "-server") ++ + javaOpts ++ + Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", + "--driver-url", masterAddress, + "--executor-id", executorId, + "--hostname", hostname, + "--cores", executorCores.toString, + "--app-id", appId) ++ + userClassPath ++ + Seq( + s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", + s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") + + // TODO: it would be nicer to just make sure there are no null commands here + commands.map(s => if (s == null) "null" else s).toList + } + + private def prepareEnvironment(): HashMap[String, String] = { + val env = new HashMap[String, String]() + Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) + + sparkConf.getExecutorEnv.foreach { case (key, value) => + // This assumes each executor environment variable set here is a path + // This is kept for backward compatibility and consistency with hadoop + YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) + } + + // Keep this for backwards compatibility but users should move to the config + sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => + YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) + } + + // lookup appropriate http scheme for container log urls + val yarnHttpPolicy = conf.get( + YarnConfiguration.YARN_HTTP_POLICY_KEY, + YarnConfiguration.YARN_HTTP_POLICY_DEFAULT + ) + val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" + + // Add log urls + container.foreach { c => + sys.env.get("SPARK_USER").foreach { user => + val containerId = ConverterUtils.toString(c.getId) + val address = c.getNodeHttpAddress + val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user" + + env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096" + env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096" + } + } + + System.getenv().asScala.filterKeys(_.startsWith("SPARK")) + .foreach { case (k, v) => env(k) = v } + env + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
