This is an automated email from the ASF dual-hosted git repository.
jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7cbe01e [SPARK-27369][CORE] Setup resources when Standalone Worker
starts up
7cbe01e is described below
commit 7cbe01e8efc3f6cd3a0cac4bcfadea8fcc74a955
Author: wuyi <[email protected]>
AuthorDate: Wed Jun 26 19:19:00 2019 -0700
[SPARK-27369][CORE] Setup resources when Standalone Worker starts up
## What changes were proposed in this pull request?
To support GPU-aware scheduling in Standalone (cluster mode), Worker should
have ability to setup resources(e.g. GPU/FPGA) when it starts up.
Similar as driver/executor do, Worker has two ways(resourceFile &
resourceDiscoveryScript) to setup resources when it starts up. User could use
`SPARK_WORKER_OPTS` to apply resource configs on Worker in the form of "-Dx=y".
For example,
```
SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=2 \
-Dspark.worker.resource.fpga.amount=1 \
-Dspark.worker.resource.fpga.discoveryScript=/Users/wuyi/tmp/getFPGAResources.sh
\
-Dspark.worker.resourcesFile=/Users/wuyi/tmp/worker-resource-file"
```
## How was this patch tested?
Tested manually in Standalone locally:
- Worker could start up normally when no resources are configured
- Worker should fail to start up when exception threw during setup
resources(e.g. unknown directory, parse fail)
- Worker could setup resources from resource file
- Worker could setup resources from discovery scripts
- Worker should setup resources from resource file & discovery scripts when
both are configure.
Closes #24841 from Ngone51/dev-worker-resources-setup.
Authored-by: wuyi <[email protected]>
Signed-off-by: Xingbo Jiang <[email protected]>
---
.../org/apache/spark/deploy/worker/Worker.scala | 25 +++++++++++++++++++---
.../org/apache/spark/internal/config/Worker.scala | 11 ++++++++++
.../apache/spark/deploy/worker/WorkerSuite.scala | 2 +-
3 files changed, 34 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b432feb..ac7a1b9 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -41,6 +41,8 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils,
Utils}
@@ -54,6 +56,7 @@ private[deploy] class Worker(
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager,
+ resourceFileOpt: Option[String] = None,
externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
extends ThreadSafeRpcEndpoint with Logging {
@@ -176,6 +179,9 @@ private[deploy] class Worker(
masterRpcAddresses.length // Make sure we can register with all masters at
the same time
)
+ // visible for tests
+ private[deploy] var resources: Map[String, ResourceInformation] = _
+
var coresUsed = 0
var memoryUsed = 0
@@ -208,6 +214,7 @@ private[deploy] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
startExternalShuffleService()
+ setupWorkerResources()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
@@ -220,6 +227,16 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
+ private def setupWorkerResources(): Unit = {
+ try {
+ resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX,
resourceFileOpt)
+ } catch {
+ case e: Exception =>
+ logError("Failed to setup worker resources: ", e)
+ System.exit(1)
+ }
+ }
+
/**
* Change to use the new master.
*
@@ -785,7 +802,8 @@ private[deploy] object Worker extends Logging {
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort,
args.cores,
- args.memory, args.masters, args.workDir, conf = conf)
+ args.memory, args.masters, args.workDir, conf = conf,
+ resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE))
// With external shuffle service enabled, if we request to launch multiple
workers on one host,
// we can only successfully launch the first worker and the rest fails,
because with the port
// bound, we may launch no more than one external shuffle service on each
host.
@@ -809,7 +827,8 @@ private[deploy] object Worker extends Logging {
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
- conf: SparkConf = new SparkConf): RpcEnv = {
+ conf: SparkConf = new SparkConf,
+ resourceFileOpt: Option[String] = None): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
@@ -817,7 +836,7 @@ private[deploy] object Worker extends Logging {
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores,
memory,
- masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
+ masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr,
resourceFileOpt))
rpcEnv
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index 47f7167..f1eaae2 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -20,6 +20,17 @@ package org.apache.spark.internal.config
import java.util.concurrent.TimeUnit
private[spark] object Worker {
+ val SPARK_WORKER_PREFIX = "spark.worker"
+
+ val SPARK_WORKER_RESOURCE_FILE =
+ ConfigBuilder("spark.worker.resourcesFile")
+ .internal()
+ .doc("Path to a file containing the resources allocated to the worker. " +
+ "The file should be formatted as a JSON array of ResourceAllocation
objects. " +
+ "Only used internally in standalone mode.")
+ .stringConf
+ .createOptional
+
val WORKER_TIMEOUT = ConfigBuilder("spark.worker.timeout")
.longConf
.createWithDefault(60)
diff --git
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index d899ee0..37e5fbc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with
BeforeAndAfter {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5,
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
- "Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier)
+ "Worker", "/tmp", conf, securityMgr, None, shuffleServiceSupplier)
_worker
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]