Repository: spark Updated Branches: refs/heads/branch-1.4 5aed92613 -> 2b1973dd2
[SPARK-9352] [SPARK-9353] [HOTFIX] Reverts PR #7668 on branch-1.4 `MasterSuite.makeMaster()` doesn't compile under 1.4. `Master` is still an actor in branch-1.4, and can only be created via `ActorSystem.actorOf`, or with `TestActorRef`. Author: Cheng Lian <[email protected]> Closes #7686 from liancheng/hotfix-revert-pr-7668 and squashes the following commits: 089a1a8 [Cheng Lian] Revert "[SPARK-9352] [SPARK-9353] Add tests for standalone scheduling code" 54e8ab0 [Cheng Lian] Revert "[HOTFIX] Fix compile in MasterSuite" Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b1973dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b1973dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b1973dd Branch: refs/heads/branch-1.4 Commit: 2b1973dd2aa214b14f10bd597c963aded133186a Parents: 5aed926 Author: Cheng Lian <[email protected]> Authored: Mon Jul 27 13:33:35 2015 +0800 Committer: Cheng Lian <[email protected]> Committed: Mon Jul 27 13:33:35 2015 +0800 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 8 +- .../spark/deploy/master/MasterSuite.scala | 199 +------------------ 2 files changed, 5 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2b1973dd/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index b4c8771..ac74eba 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -551,7 +551,7 @@ private[master] class Master( * allocated at a time, 12 cores from each worker would be assigned to each executor. * Since 12 < 16, no executors would launch [SPARK-8881]. */ - private def scheduleExecutorsOnWorkers( + private[master] def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { @@ -577,11 +577,7 @@ private[master] class Master( while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) { coresToAssign -= coresPerExecutor assignedCores(pos) += coresPerExecutor - // If cores per executor is not set, we are assigning 1 core at a time - // without actually meaning to launch 1 executor for each core assigned - if (app.desc.coresPerExecutor.isDefined) { - assignedMemory(pos) += memoryPerExecutor - } + assignedMemory(pos) += memoryPerExecutor // Spreading out an application means spreading out its executors across as // many workers as possible. If we are not spreading out, then we should keep http://git-wip-us.apache.org/repos/asf/spark/blob/2b1973dd/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index e585c98..014e87b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -27,15 +27,14 @@ import scala.language.postfixOps import akka.actor.Address import org.json4s._ import org.json4s.jackson.JsonMethods._ -import org.scalatest.{Matchers, PrivateMethodTester} +import org.scalatest.Matchers import org.scalatest.concurrent.Eventually import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory} -import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy._ -import org.apache.spark.rpc.RpcEnv -class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester { +class MasterSuite extends SparkFunSuite with Matchers with Eventually { test("toAkkaUrl") { val conf = new SparkConf(loadDefaults = false) @@ -185,196 +184,4 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva } } - test("basic scheduling - spread out") { - testBasicScheduling(spreadOut = true) - } - - test("basic scheduling - no spread out") { - testBasicScheduling(spreadOut = false) - } - - test("scheduling with max cores - spread out") { - testSchedulingWithMaxCores(spreadOut = true) - } - - test("scheduling with max cores - no spread out") { - testSchedulingWithMaxCores(spreadOut = false) - } - - test("scheduling with cores per executor - spread out") { - testSchedulingWithCoresPerExecutor(spreadOut = true) - } - - test("scheduling with cores per executor - no spread out") { - testSchedulingWithCoresPerExecutor(spreadOut = false) - } - - test("scheduling with cores per executor AND max cores - spread out") { - testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true) - } - - test("scheduling with cores per executor AND max cores - no spread out") { - testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false) - } - - private def testBasicScheduling(spreadOut: Boolean): Unit = { - val master = makeMaster() - val appInfo = makeAppInfo(1024) - val workerInfo = makeWorkerInfo(4096, 10) - val workerInfos = Array(workerInfo, workerInfo, workerInfo) - val scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - assert(scheduledCores(0) === 10) - assert(scheduledCores(1) === 10) - assert(scheduledCores(2) === 10) - } - - private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = { - val master = makeMaster() - val appInfo1 = makeAppInfo(1024, maxCores = Some(8)) - val appInfo2 = makeAppInfo(1024, maxCores = Some(16)) - val workerInfo = makeWorkerInfo(4096, 10) - val workerInfos = Array(workerInfo, workerInfo, workerInfo) - var scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - // With spreading out, each worker should be assigned a few cores - if (spreadOut) { - assert(scheduledCores(0) === 3) - assert(scheduledCores(1) === 3) - assert(scheduledCores(2) === 2) - } else { - // Without spreading out, the cores should be concentrated on the first worker - assert(scheduledCores(0) === 8) - assert(scheduledCores(1) === 0) - assert(scheduledCores(2) === 0) - } - // Now test the same thing with max cores > cores per worker - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - if (spreadOut) { - assert(scheduledCores(0) === 6) - assert(scheduledCores(1) === 5) - assert(scheduledCores(2) === 5) - } else { - // Without spreading out, the first worker should be fully booked, - // and the leftover cores should spill over to the second worker only. - assert(scheduledCores(0) === 10) - assert(scheduledCores(1) === 6) - assert(scheduledCores(2) === 0) - } - } - - private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = { - val master = makeMaster() - val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2)) - val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2)) - val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3)) - val workerInfo = makeWorkerInfo(4096, 10) - val workerInfos = Array(workerInfo, workerInfo, workerInfo) - // Each worker should end up with 4 executors with 2 cores each - // This should be 4 because of the memory restriction on each worker - var scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - assert(scheduledCores(0) === 8) - assert(scheduledCores(1) === 8) - assert(scheduledCores(2) === 8) - // Now test the same thing without running into the worker memory limit - // Each worker should now end up with 5 executors with 2 cores each - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - assert(scheduledCores(0) === 10) - assert(scheduledCores(1) === 10) - assert(scheduledCores(2) === 10) - // Now test the same thing with a cores per executor that 10 is not divisible by - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - assert(scheduledCores(0) === 9) - assert(scheduledCores(1) === 9) - assert(scheduledCores(2) === 9) - } - - // Sorry for the long method name! - private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = { - val master = makeMaster() - val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4)) - val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20)) - val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20)) - val workerInfo = makeWorkerInfo(4096, 10) - val workerInfos = Array(workerInfo, workerInfo, workerInfo) - // We should only launch two executors, each with exactly 2 cores - var scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - if (spreadOut) { - assert(scheduledCores(0) === 2) - assert(scheduledCores(1) === 2) - assert(scheduledCores(2) === 0) - } else { - assert(scheduledCores(0) === 4) - assert(scheduledCores(1) === 0) - assert(scheduledCores(2) === 0) - } - // Test max cores > number of cores per worker - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - if (spreadOut) { - assert(scheduledCores(0) === 8) - assert(scheduledCores(1) === 6) - assert(scheduledCores(2) === 6) - } else { - assert(scheduledCores(0) === 10) - assert(scheduledCores(1) === 10) - assert(scheduledCores(2) === 0) - } - // Test max cores > number of cores per worker AND - // a cores per executor that is 10 is not divisible by - scheduledCores = master.invokePrivate( - _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut)) - assert(scheduledCores.length === 3) - if (spreadOut) { - assert(scheduledCores(0) === 6) - assert(scheduledCores(1) === 6) - assert(scheduledCores(2) === 6) - } else { - assert(scheduledCores(0) === 9) - assert(scheduledCores(1) === 9) - assert(scheduledCores(2) === 0) - } - } - - // =============================== - // | Utility methods for testing | - // =============================== - - private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers) - - private def makeMaster(conf: SparkConf = new SparkConf): Master = { - val securityMgr = new SecurityManager(conf) - val rpcEnv = RpcEnv.create(Master.systemName, "localhost", 7077, conf, securityMgr) - val master = new Master(rpcEnv, rpcEnv.address, 8080, securityMgr, conf) - master - } - - private def makeAppInfo( - memoryPerExecutorMb: Int, - coresPerExecutor: Option[Int] = None, - maxCores: Option[Int] = None): ApplicationInfo = { - val desc = new ApplicationDescription( - "test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor) - val appId = System.currentTimeMillis.toString - new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue) - } - - private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = { - val workerId = System.currentTimeMillis.toString - new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address") - } - } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
