Repository: spark
Updated Branches:
  refs/heads/master a8ced76f1 -> 81e5619ca


http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
deleted file mode 100644
index a58784f..0000000
--- 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.network.yarn
-
-import java.io.{DataOutputStream, File, FileOutputStream, IOException}
-import java.nio.ByteBuffer
-import java.nio.file.Files
-import java.nio.file.attribute.PosixFilePermission._
-import java.util.EnumSet
-
-import scala.annotation.tailrec
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.service.ServiceStateException
-import org.apache.hadoop.yarn.api.records.ApplicationId
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, 
ApplicationTerminationContext}
-import org.scalatest.{BeforeAndAfterEach, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.SecurityManager
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.network.shuffle.ShuffleTestAccessor
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
-import org.apache.spark.util.Utils
-
-class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterEach {
-  private[yarn] var yarnConfig: YarnConfiguration = null
-  private[yarn] val SORT_MANAGER = 
"org.apache.spark.shuffle.sort.SortShuffleManager"
-
-  override def beforeEach(): Unit = {
-    super.beforeEach()
-    yarnConfig = new YarnConfiguration()
-    yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
-    
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
-      classOf[YarnShuffleService].getCanonicalName)
-    yarnConfig.setInt("spark.shuffle.service.port", 0)
-    yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
-    val localDir = Utils.createTempDir()
-    yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
-  }
-
-  var s1: YarnShuffleService = null
-  var s2: YarnShuffleService = null
-  var s3: YarnShuffleService = null
-
-  override def afterEach(): Unit = {
-    try {
-      if (s1 != null) {
-        s1.stop()
-        s1 = null
-      }
-      if (s2 != null) {
-        s2.stop()
-        s2 = null
-      }
-      if (s3 != null) {
-        s3.stop()
-        s3 = null
-      }
-    } finally {
-      super.afterEach()
-    }
-  }
-
-  test("executor state kept across NM restart") {
-    s1 = new YarnShuffleService
-    // set auth to true to test the secrets recovery
-    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
-    s1.init(yarnConfig)
-    val app1Id = ApplicationId.newInstance(0, 1)
-    val app1Data = makeAppInfo("user", app1Id)
-    s1.initializeApplication(app1Data)
-    val app2Id = ApplicationId.newInstance(0, 2)
-    val app2Data = makeAppInfo("user", app2Id)
-    s1.initializeApplication(app2Data)
-
-    val execStateFile = s1.registeredExecutorFile
-    execStateFile should not be (null)
-    val secretsFile = s1.secretsFile
-    secretsFile should not be (null)
-    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
-    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
-
-    val blockHandler = s1.blockHandler
-    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
-    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
-
-    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
-    blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
-    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
-      be (Some(shuffleInfo1))
-    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
-      be (Some(shuffleInfo2))
-
-    if (!execStateFile.exists()) {
-      @tailrec def findExistingParent(file: File): File = {
-        if (file == null) file
-        else if (file.exists()) file
-        else findExistingParent(file.getParentFile())
-      }
-      val existingParent = findExistingParent(execStateFile)
-      assert(false, s"$execStateFile does not exist -- closest existing parent 
is $existingParent")
-    }
-    assert(execStateFile.exists(), s"$execStateFile did not exist")
-
-    // now we pretend the shuffle service goes down, and comes back up
-    s1.stop()
-    s2 = new YarnShuffleService
-    s2.init(yarnConfig)
-    s2.secretsFile should be (secretsFile)
-    s2.registeredExecutorFile should be (execStateFile)
-
-    val handler2 = s2.blockHandler
-    val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
-
-    // now we reinitialize only one of the apps, and expect yarn to tell us 
that app2 was stopped
-    // during the restart
-    s2.initializeApplication(app1Data)
-    s2.stopApplication(new ApplicationTerminationContext(app2Id))
-    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be 
(Some(shuffleInfo1))
-    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be 
(None)
-
-    // Act like the NM restarts one more time
-    s2.stop()
-    s3 = new YarnShuffleService
-    s3.init(yarnConfig)
-    s3.registeredExecutorFile should be (execStateFile)
-    s3.secretsFile should be (secretsFile)
-
-    val handler3 = s3.blockHandler
-    val resolver3 = ShuffleTestAccessor.getBlockResolver(handler3)
-
-    // app1 is still running
-    s3.initializeApplication(app1Data)
-    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver3) should be 
(Some(shuffleInfo1))
-    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be 
(None)
-    s3.stop()
-  }
-
-  test("removed applications should not be in registered executor file") {
-    s1 = new YarnShuffleService
-    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
-    s1.init(yarnConfig)
-    val secretsFile = s1.secretsFile
-    secretsFile should be (null)
-    val app1Id = ApplicationId.newInstance(0, 1)
-    val app1Data = makeAppInfo("user", app1Id)
-    s1.initializeApplication(app1Data)
-    val app2Id = ApplicationId.newInstance(0, 2)
-    val app2Data = makeAppInfo("user", app2Id)
-    s1.initializeApplication(app2Data)
-
-    val execStateFile = s1.registeredExecutorFile
-    execStateFile should not be (null)
-    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
-    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
-
-    val blockHandler = s1.blockHandler
-    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
-    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
-
-    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
-    blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
-
-    val db = ShuffleTestAccessor.shuffleServiceLevelDB(blockResolver)
-    ShuffleTestAccessor.reloadRegisteredExecutors(db) should not be empty
-
-    s1.stopApplication(new ApplicationTerminationContext(app1Id))
-    ShuffleTestAccessor.reloadRegisteredExecutors(db) should not be empty
-    s1.stopApplication(new ApplicationTerminationContext(app2Id))
-    ShuffleTestAccessor.reloadRegisteredExecutors(db) shouldBe empty
-  }
-
-  test("shuffle service should be robust to corrupt registered executor file") 
{
-    s1 = new YarnShuffleService
-    s1.init(yarnConfig)
-    val app1Id = ApplicationId.newInstance(0, 1)
-    val app1Data = makeAppInfo("user", app1Id)
-    s1.initializeApplication(app1Data)
-
-    val execStateFile = s1.registeredExecutorFile
-    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
-
-    val blockHandler = s1.blockHandler
-    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
-    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
-
-    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
-
-    // now we pretend the shuffle service goes down, and comes back up.  But 
we'll also
-    // make a corrupt registeredExecutor File
-    s1.stop()
-
-    execStateFile.listFiles().foreach{_.delete()}
-
-    val out = new DataOutputStream(new FileOutputStream(execStateFile + 
"/CURRENT"))
-    out.writeInt(42)
-    out.close()
-
-    s2 = new YarnShuffleService
-    s2.init(yarnConfig)
-    s2.registeredExecutorFile should be (execStateFile)
-
-    val handler2 = s2.blockHandler
-    val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
-
-    // we re-initialize app1, but since the file was corrupt there is nothing 
we can do about it ...
-    s2.initializeApplication(app1Data)
-    // however, when we initialize a totally new app2, everything is still 
happy
-    val app2Id = ApplicationId.newInstance(0, 2)
-    val app2Data = makeAppInfo("user", app2Id)
-    s2.initializeApplication(app2Data)
-    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
-    resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
-    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be 
(Some(shuffleInfo2))
-    s2.stop()
-
-    // another stop & restart should be fine though (eg., we recover from 
previous corruption)
-    s3 = new YarnShuffleService
-    s3.init(yarnConfig)
-    s3.registeredExecutorFile should be (execStateFile)
-    val handler3 = s3.blockHandler
-    val resolver3 = ShuffleTestAccessor.getBlockResolver(handler3)
-
-    s3.initializeApplication(app2Data)
-    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be 
(Some(shuffleInfo2))
-    s3.stop()
-  }
-
-  test("get correct recovery path") {
-    // Test recovery path is set outside the shuffle service, this is to 
simulate NM recovery
-    // enabled scenario, where recovery path will be set by yarn.
-    s1 = new YarnShuffleService
-    val recoveryPath = new Path(Utils.createTempDir().toURI)
-    s1.setRecoveryPath(recoveryPath)
-
-    s1.init(yarnConfig)
-    s1._recoveryPath should be (recoveryPath)
-    s1.stop()
-
-    // Test recovery path is set inside the shuffle service, this will be 
happened when NM
-    // recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
-    s2 = new YarnShuffleService
-    s2.init(yarnConfig)
-    s2._recoveryPath should be
-      (new 
Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
-    s2.stop()
-  }
-
-  test("moving recovery file from NM local dir to recovery path") {
-    // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is 
enabled, we should move
-    // old recovery file to the new path to keep compatibility
-
-    // Simulate s1 is running on old version of Hadoop in which recovery file 
is in the NM local
-    // dir.
-    s1 = new YarnShuffleService
-    // set auth to true to test the secrets recovery
-    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
-    s1.init(yarnConfig)
-    val app1Id = ApplicationId.newInstance(0, 1)
-    val app1Data = makeAppInfo("user", app1Id)
-    s1.initializeApplication(app1Data)
-    val app2Id = ApplicationId.newInstance(0, 2)
-    val app2Data = makeAppInfo("user", app2Id)
-    s1.initializeApplication(app2Data)
-
-    assert(s1.secretManager.getSecretKey(app1Id.toString()) != null)
-    assert(s1.secretManager.getSecretKey(app2Id.toString()) != null)
-
-    val execStateFile = s1.registeredExecutorFile
-    execStateFile should not be (null)
-    val secretsFile = s1.secretsFile
-    secretsFile should not be (null)
-    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
-    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
-
-    val blockHandler = s1.blockHandler
-    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
-    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
-
-    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
-    blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
-    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
-      be (Some(shuffleInfo1))
-    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
-      be (Some(shuffleInfo2))
-
-    assert(execStateFile.exists(), s"$execStateFile did not exist")
-
-    s1.stop()
-
-    // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
-    assert(execStateFile.exists())
-    val recoveryPath = new Path(Utils.createTempDir().toURI)
-    s2 = new YarnShuffleService
-    s2.setRecoveryPath(recoveryPath)
-    s2.init(yarnConfig)
-
-    // Ensure that s2 has loaded known apps from the secrets db.
-    assert(s2.secretManager.getSecretKey(app1Id.toString()) != null)
-    assert(s2.secretManager.getSecretKey(app2Id.toString()) != null)
-
-    val execStateFile2 = s2.registeredExecutorFile
-    val secretsFile2 = s2.secretsFile
-
-    recoveryPath.toString should be (new 
Path(execStateFile2.getParentFile.toURI).toString)
-    recoveryPath.toString should be (new 
Path(secretsFile2.getParentFile.toURI).toString)
-    eventually(timeout(10 seconds), interval(5 millis)) {
-      assert(!execStateFile.exists())
-    }
-    eventually(timeout(10 seconds), interval(5 millis)) {
-      assert(!secretsFile.exists())
-    }
-
-    val handler2 = s2.blockHandler
-    val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
-
-    // now we reinitialize only one of the apps, and expect yarn to tell us 
that app2 was stopped
-    // during the restart
-    // Since recovery file is got from old path, so the previous state should 
be stored.
-    s2.initializeApplication(app1Data)
-    s2.stopApplication(new ApplicationTerminationContext(app2Id))
-    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be 
(Some(shuffleInfo1))
-    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be 
(None)
-
-    s2.stop()
-  }
-
-  test("service throws error if cannot start") {
-    // Set up a read-only local dir.
-    val roDir = Utils.createTempDir()
-    Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, 
OWNER_EXECUTE))
-    yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
-
-    // Try to start the shuffle service, it should fail.
-    val service = new YarnShuffleService()
-
-    try {
-      val error = intercept[ServiceStateException] {
-        service.init(yarnConfig)
-      }
-      assert(error.getCause().isInstanceOf[IOException])
-    } finally {
-      service.stop()
-      Files.setPosixFilePermissions(roDir.toPath(),
-        EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE))
-    }
-  }
-
-  private def makeAppInfo(user: String, appId: ApplicationId): 
ApplicationInitializationContext = {
-    val secret = ByteBuffer.wrap(new Array[Byte](0))
-    new ApplicationInitializationContext(user, appId, secret)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala 
b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
deleted file mode 100644
index db322cd..0000000
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.network.yarn
-
-import java.io.File
-
-/**
- * just a cheat to get package-visible members in tests
- */
-object YarnTestAccessor {
-  def getShuffleServicePort: Int = {
-    YarnShuffleService.boundPort
-  }
-
-  def getShuffleServiceInstance: YarnShuffleService = {
-    YarnShuffleService.instance
-  }
-
-  def getRegisteredExecutorFile(service: YarnShuffleService): File = {
-    service.registeredExecutorFile
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
deleted file mode 100644
index 6ea7984..0000000
--- 
a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-
-/**
- * Test the integration with [[SchedulerExtensionServices]]
- */
-class ExtensionServiceIntegrationSuite extends SparkFunSuite
-  with LocalSparkContext with BeforeAndAfter
-  with Logging {
-
-  val applicationId = new StubApplicationId(0, 1111L)
-  val attemptId = new StubApplicationAttemptId(applicationId, 1)
-
-  /*
-   * Setup phase creates the spark context
-   */
-  before {
-    val sparkConf = new SparkConf()
-    sparkConf.set(SCHEDULER_SERVICES, 
Seq(classOf[SimpleExtensionService].getName()))
-    sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
-    sc = new SparkContext(sparkConf)
-  }
-
-  test("Instantiate") {
-    val services = new SchedulerExtensionServices()
-    assertResult(Nil, "non-nil service list") {
-      services.getServices
-    }
-    services.start(SchedulerExtensionServiceBinding(sc, applicationId))
-    services.stop()
-  }
-
-  test("Contains SimpleExtensionService Service") {
-    val services = new SchedulerExtensionServices()
-    try {
-      services.start(SchedulerExtensionServiceBinding(sc, applicationId))
-      val serviceList = services.getServices
-      assert(serviceList.nonEmpty, "empty service list")
-      val (service :: Nil) = serviceList
-      val simpleService = service.asInstanceOf[SimpleExtensionService]
-      assert(simpleService.started.get, "service not started")
-      services.stop()
-      assert(!simpleService.started.get, "service not stopped")
-    } finally {
-      services.stop()
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
 
b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
deleted file mode 100644
index 9b8c98c..0000000
--- 
a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-private[spark] class SimpleExtensionService extends SchedulerExtensionService {
-
-  /** started flag; set in the `start()` call, stopped in `stop()`. */
-  val started = new AtomicBoolean(false)
-
-  override def start(binding: SchedulerExtensionServiceBinding): Unit = {
-    started.set(true)
-  }
-
-  override def stop(): Unit = {
-    started.set(false)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
 
b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
deleted file mode 100644
index 4b57b95..0000000
--- 
a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-/**
- * A stub application ID; can be set in constructor and/or updated later.
- * @param applicationId application ID
- * @param attempt an attempt counter
- */
-class StubApplicationAttemptId(var applicationId: ApplicationId, var attempt: 
Int)
-    extends ApplicationAttemptId {
-
-  override def setApplicationId(appID: ApplicationId): Unit = {
-    applicationId = appID
-  }
-
-  override def getAttemptId: Int = {
-    attempt
-  }
-
-  override def setAttemptId(attemptId: Int): Unit = {
-    attempt = attemptId
-  }
-
-  override def getApplicationId: ApplicationId = {
-    applicationId
-  }
-
-  override def build(): Unit = {
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
 
b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
deleted file mode 100644
index bffa0e0..0000000
--- 
a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.ApplicationId
-
-/**
- * Simple Testing Application Id; ID and cluster timestamp are set in 
constructor
- * and cannot be updated.
- * @param id app id
- * @param clusterTimestamp timestamp
- */
-private[spark] class StubApplicationId(id: Int, clusterTimestamp: Long) 
extends ApplicationId {
-  override def getId: Int = {
-    id
-  }
-
-  override def getClusterTimestamp: Long = {
-    clusterTimestamp
-  }
-
-  override def setId(id: Int): Unit = {}
-
-  override def setClusterTimestamp(clusterTimestamp: Long): Unit = {}
-
-  override def build(): Unit = {}
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to