This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cbe01e  [SPARK-27369][CORE] Setup resources when Standalone Worker 
starts up
7cbe01e is described below

commit 7cbe01e8efc3f6cd3a0cac4bcfadea8fcc74a955
Author: wuyi <[email protected]>
AuthorDate: Wed Jun 26 19:19:00 2019 -0700

    [SPARK-27369][CORE] Setup resources when Standalone Worker starts up
    
    ## What changes were proposed in this pull request?
    
    To support GPU-aware scheduling in Standalone (cluster mode), Worker should 
have ability to setup resources(e.g. GPU/FPGA) when it starts up.
    
    Similar as driver/executor do, Worker has two ways(resourceFile & 
resourceDiscoveryScript) to setup resources when it starts up.  User could use 
`SPARK_WORKER_OPTS` to apply resource configs on Worker in the form of "-Dx=y". 
For example,
    ```
    SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=2 \
                       -Dspark.worker.resource.fpga.amount=1 \
                       
-Dspark.worker.resource.fpga.discoveryScript=/Users/wuyi/tmp/getFPGAResources.sh
 \
                       
-Dspark.worker.resourcesFile=/Users/wuyi/tmp/worker-resource-file"
     ```
    ## How was this patch tested?
    
    Tested manually in Standalone locally:
    
    - Worker could start up normally when no resources are configured
    
    - Worker should fail to start up when exception threw during setup 
resources(e.g. unknown directory, parse fail)
    
    - Worker could setup resources from resource file
    
    - Worker could setup resources from discovery scripts
    
    - Worker should setup resources from resource file & discovery scripts when 
both are configure.
    
    Closes #24841 from Ngone51/dev-worker-resources-setup.
    
    Authored-by: wuyi <[email protected]>
    Signed-off-by: Xingbo Jiang <[email protected]>
---
 .../org/apache/spark/deploy/worker/Worker.scala    | 25 +++++++++++++++++++---
 .../org/apache/spark/internal/config/Worker.scala  | 11 ++++++++++
 .../apache/spark/deploy/worker/WorkerSuite.scala   |  2 +-
 3 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b432feb..ac7a1b9 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -41,6 +41,8 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc._
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, 
Utils}
 
@@ -54,6 +56,7 @@ private[deploy] class Worker(
     workDirPath: String = null,
     val conf: SparkConf,
     val securityMgr: SecurityManager,
+    resourceFileOpt: Option[String] = None,
     externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
   extends ThreadSafeRpcEndpoint with Logging {
 
@@ -176,6 +179,9 @@ private[deploy] class Worker(
     masterRpcAddresses.length // Make sure we can register with all masters at 
the same time
   )
 
+  // visible for tests
+  private[deploy] var resources: Map[String, ResourceInformation] = _
+
   var coresUsed = 0
   var memoryUsed = 0
 
@@ -208,6 +214,7 @@ private[deploy] class Worker(
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     startExternalShuffleService()
+    setupWorkerResources()
     webUi = new WorkerWebUI(this, workDir, webUiPort)
     webUi.bind()
 
@@ -220,6 +227,16 @@ private[deploy] class Worker(
     metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
+  private def setupWorkerResources(): Unit = {
+    try {
+      resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, 
resourceFileOpt)
+    } catch {
+      case e: Exception =>
+        logError("Failed to setup worker resources: ", e)
+        System.exit(1)
+    }
+  }
+
   /**
    * Change to use the new master.
    *
@@ -785,7 +802,8 @@ private[deploy] object Worker extends Logging {
     val conf = new SparkConf
     val args = new WorkerArguments(argStrings, conf)
     val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, 
args.cores,
-      args.memory, args.masters, args.workDir, conf = conf)
+      args.memory, args.masters, args.workDir, conf = conf,
+      resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE))
     // With external shuffle service enabled, if we request to launch multiple 
workers on one host,
     // we can only successfully launch the first worker and the rest fails, 
because with the port
     // bound, we may launch no more than one external shuffle service on each 
host.
@@ -809,7 +827,8 @@ private[deploy] object Worker extends Logging {
       masterUrls: Array[String],
       workDir: String,
       workerNumber: Option[Int] = None,
-      conf: SparkConf = new SparkConf): RpcEnv = {
+      conf: SparkConf = new SparkConf,
+      resourceFileOpt: Option[String] = None): RpcEnv = {
 
     // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
     val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
@@ -817,7 +836,7 @@ private[deploy] object Worker extends Logging {
     val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
     val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
     rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, 
memory,
-      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
+      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, 
resourceFileOpt))
     rpcEnv
   }
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index 47f7167..f1eaae2 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -20,6 +20,17 @@ package org.apache.spark.internal.config
 import java.util.concurrent.TimeUnit
 
 private[spark] object Worker {
+  val SPARK_WORKER_PREFIX = "spark.worker"
+
+  val SPARK_WORKER_RESOURCE_FILE =
+    ConfigBuilder("spark.worker.resourcesFile")
+    .internal()
+    .doc("Path to a file containing the resources allocated to the worker. " +
+      "The file should be formatted as a JSON array of ResourceAllocation 
objects. " +
+      "Only used internally in standalone mode.")
+    .stringConf
+    .createOptional
+
   val WORKER_TIMEOUT = ConfigBuilder("spark.worker.timeout")
     .longConf
     .createWithDefault(60)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index d899ee0..37e5fbc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
     val securityMgr = new SecurityManager(conf)
     val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
     _worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, 
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier)
+      "Worker", "/tmp", conf, securityMgr, None, shuffleServiceSupplier)
     _worker
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to