This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c8628c9 Revert "[SPARK-34104][SPARK-34105][CORE][K8S] Maximum
decommissioning time & allow decommissioning for excludes"
c8628c9 is described below
commit c8628c943cd12bbad7561bdc297cea9ff23becc7
Author: HyukjinKwon <[email protected]>
AuthorDate: Wed Feb 10 08:00:03 2021 +0900
Revert "[SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time
& allow decommissioning for excludes"
This reverts commit 50641d2e3d659f51432aa2c0e6b9af76d71a5796.
---
.../apache/spark/ExecutorAllocationClient.scala | 6 ---
.../org/apache/spark/internal/config/package.scala | 19 +-------
.../org/apache/spark/scheduler/HealthTracker.scala | 35 +++-----------
.../cluster/CoarseGrainedClusterMessage.scala | 3 --
.../cluster/CoarseGrainedSchedulerBackend.scala | 56 ++--------------------
.../spark/scheduler/HealthTrackerSuite.scala | 45 -----------------
.../k8s/integrationtest/DecommissionSuite.scala | 32 -------------
.../k8s/integrationtest/KubernetesSuite.scala | 5 +-
8 files changed, 14 insertions(+), 187 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 5b587d7..cdba1c4 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -129,12 +129,6 @@ private[spark] trait ExecutorAllocationClient {
decommissionedExecutors.nonEmpty &&
decommissionedExecutors(0).equals(executorId)
}
- /**
- * Request that the cluster manager decommission every executor on the
specified host.
- *
- * @return whether the request is acknowledged by the cluster manager.
- */
- def decommissionExecutorsOnHost(host: String): Boolean
/**
* Request that the cluster manager kill every executor on the specified
host.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 3101bb6..7aeb51d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -827,13 +827,6 @@ package object config {
.booleanConf
.createWithDefault(false)
- private[spark] val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED =
- ConfigBuilder("spark.excludeOnFailure.killExcludedExecutors.decommission")
- .doc("Attempt decommission of excluded nodes instead of going directly
to kill")
- .version("3.2.0")
- .booleanConf
- .createWithDefault(false)
-
private[spark] val EXCLUDE_ON_FAILURE_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskExcludeOnFailureTime")
.internal()
@@ -1965,8 +1958,7 @@ package object config {
private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
ConfigBuilder("spark.executor.decommission.killInterval")
- .doc("Duration after which a decommissioned executor will be killed
forcefully " +
- "*by an outside* (e.g. non-spark) service. " +
+ .doc("Duration after which a decommissioned executor will be killed
forcefully." +
"This config is useful for cloud environments where we know in advance
when " +
"an executor is going to go down after decommissioning signal i.e.
around 2 mins " +
"in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is
currently " +
@@ -1975,15 +1967,6 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createOptional
- private[spark] val EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT =
- ConfigBuilder("spark.executor.decommission.forceKillTimeout")
- .doc("Duration after which a Spark will force a decommissioning executor
to exit." +
- " this should be set to a high value in most situations as low values
will prevent " +
- " block migrations from having enough time to complete.")
- .version("3.2.0")
- .timeConf(TimeUnit.SECONDS)
- .createOptional
-
private[spark] val EXECUTOR_DECOMMISSION_SIGNAL =
ConfigBuilder("spark.executor.decommission.signal")
.doc("The signal that used to trigger the executor to start
decommission.")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
index 6bd5668..c6b8dca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
@@ -40,7 +40,6 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
* stage, but still many failures over the entire application
* * "flaky" executors -- they don't fail every task, but are still faulty
enough to merit
* excluding
- * * missing shuffle files -- may trigger fetch failures on healthy executors.
*
* See the design doc on SPARK-8425 for a more in-depth discussion. Note
SPARK-32037 renamed
* the feature.
@@ -65,8 +64,6 @@ private[scheduler] class HealthTracker (
val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS =
HealthTracker.getExludeOnFailureTimeout(conf)
private val EXCLUDE_FETCH_FAILURE_ENABLED =
conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED)
- private val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED =
- conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED)
/**
* A map from executorId to information on task failures. Tracks the time of
each task failure,
@@ -157,21 +154,11 @@ private[scheduler] class HealthTracker (
}
private def killExecutor(exec: String, msg: String): Unit = {
- val fullMsg = if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
- s"${msg} (actually decommissioning)"
- } else {
- msg
- }
allocationClient match {
case Some(a) =>
- logInfo(fullMsg)
- if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
- a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg),
- adjustTargetNumExecutors = false)
- } else {
- a.killExecutors(Seq(exec), adjustTargetNumExecutors = false,
countFailures = false,
- force = true)
- }
+ logInfo(msg)
+ a.killExecutors(Seq(exec), adjustTargetNumExecutors = false,
countFailures = false,
+ force = true)
case None =>
logInfo(s"Not attempting to kill excluded executor id $exec " +
s"since allocation client is not defined.")
@@ -195,18 +182,10 @@ private[scheduler] class HealthTracker (
if (conf.get(config.EXCLUDE_ON_FAILURE_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
- if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
- logInfo(s"Decommissioning all executors on excluded host $node " +
- s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
- if (!a.decommissionExecutorsOnHost(node)) {
- logError(s"Decommissioning executors on $node failed.")
- }
- } else {
- logInfo(s"Killing all executors on excluded host $node " +
- s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
- if (!a.killExecutorsOnHost(node)) {
- logError(s"Killing executors on node $node failed.")
- }
+ logInfo(s"Killing all executors on excluded host $node " +
+ s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
+ if (a.killExecutorsOnHost(node) == false) {
+ logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on excluded host $node
" +
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index a6f52f9..2f17143 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -49,9 +49,6 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage
- case class DecommissionExecutorsOnHost(host: String)
- extends CoarseGrainedClusterMessage
-
case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b44f677..ccb5eb1 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster
-import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import javax.annotation.concurrent.GuardedBy
@@ -115,11 +115,6 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
- private val cleanupService: Option[ScheduledExecutorService] =
- conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { _ =>
-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cleanup-decommission-execs")
- }
-
class DriverEndpoint extends IsolatedRpcEndpoint with Logging {
override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
@@ -181,20 +176,11 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
case KillExecutorsOnHost(host) =>
- scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
- killExecutors(execs.toSeq, adjustTargetNumExecutors = false,
countFailures = false,
+ scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
+ killExecutors(exec.toSeq, adjustTargetNumExecutors = false,
countFailures = false,
force = true)
}
- case DecommissionExecutorsOnHost(host) =>
- val reason = ExecutorDecommissionInfo(s"Decommissioning all executors
on $host.")
- scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
- val execsWithReasons = execs.map(exec => (exec, reason)).toArray
-
- decommissionExecutors(execsWithReasons, adjustTargetNumExecutors =
false,
- triggeredByExecutor = false)
- }
-
case UpdateDelegationTokens(newDelegationTokens) =>
updateDelegationTokens(newDelegationTokens)
@@ -520,21 +506,6 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
}
}
- conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { cleanupInterval =>
- val cleanupTask = new Runnable() {
- override def run(): Unit = Utils.tryLogNonFatalError {
- val stragglers = CoarseGrainedSchedulerBackend.this.synchronized {
-
executorsToDecommission.filter(executorsPendingDecommission.contains)
- }
- if (stragglers.nonEmpty) {
- logInfo(s"${stragglers.toList} failed to decommission in
${cleanupInterval}, killing.")
- killExecutors(stragglers, false, false, true)
- }
- }
- }
- cleanupService.map(_.schedule(cleanupTask, cleanupInterval,
TimeUnit.SECONDS))
- }
-
executorsToDecommission
}
@@ -577,7 +548,6 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
override def stop(): Unit = {
reviveThread.shutdownNow()
- cleanupService.foreach(_.shutdownNow())
stopExecutors()
delegationTokenManager.foreach(_.stop())
try {
@@ -881,29 +851,13 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
Future.successful(false)
/**
- * Request that the cluster manager decommissions all executors on a given
host.
- * @return whether the decommission request is acknowledged.
- */
- final override def decommissionExecutorsOnHost(host: String): Boolean = {
- logInfo(s"Requesting to kill any and all executors on host $host")
- // A potential race exists if a new executor attempts to register on a host
- // that is on the exclude list and is no longer valid. To avoid this race,
- // all executor registration and decommissioning happens in the event
loop. This way, either
- // an executor will fail to register, or will be decommed when all
executors on a host
- // are decommed.
- // Decommission all the executors on this host in an event loop to ensure
serialization.
- driverEndpoint.send(DecommissionExecutorsOnHost(host))
- true
- }
-
- /**
* Request that the cluster manager kill all executors on a given host.
* @return whether the kill request is acknowledged.
*/
final override def killExecutorsOnHost(host: String): Boolean = {
- logInfo(s"Requesting to kill any and all executors on host $host")
+ logInfo(s"Requesting to kill any and all executors on host ${host}")
// A potential race exists if a new executor attempts to register on a host
- // that is on the exclude list and is no longer valid. To avoid this race,
+ // that is on the exclude list and is no no longer valid. To avoid this
race,
// all executor registration and killing happens in the event loop. This
way, either
// an executor will fail to register, or will be killed when all executors
on a host
// are killed.
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
index 5710be1..7ecc1f5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
@@ -554,51 +554,6 @@ class HealthTrackerSuite extends SparkFunSuite with
BeforeAndAfterEach with Mock
verify(allocationClientMock).killExecutorsOnHost("hostA")
}
- test("excluding decommission and kills executors when enabled") {
- val allocationClientMock = mock[ExecutorAllocationClient]
-
- // verify we decommission when configured
- conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
- conf.set(config.DECOMMISSION_ENABLED.key, "true")
- conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
- conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
- conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
- healthTracker = new HealthTracker(listenerBusMock, conf,
Some(allocationClientMock), clock)
-
- // Fail 4 tasks in one task set on executor 1, so that executor gets
excluded for the whole
- // application.
- val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
- (0 until 4).foreach { partition =>
- taskSetExclude2.updateExcludedForFailedTask(
- "hostA", exec = "1", index = partition, failureReason = "testing")
- }
- healthTracker.updateExcludedForSuccessfulTaskSet(0, 0,
taskSetExclude2.execToFailures)
-
- val msg1 =
- "Killing excluded executor id 1 since
spark.excludeOnFailure.killExcludedExecutors is set." +
- " (actually decommissioning)"
-
- verify(allocationClientMock).decommissionExecutor(
- "1", ExecutorDecommissionInfo(msg1), false)
-
- val taskSetExclude3 = createTaskSetExcludelist(stageId = 1)
- // Fail 4 tasks in one task set on executor 2, so that executor gets
excluded for the whole
- // application. Since that's the second executor that is excluded on the
same node, we also
- // exclude that node.
- (0 until 4).foreach { partition =>
- taskSetExclude3.updateExcludedForFailedTask(
- "hostA", exec = "2", index = partition, failureReason = "testing")
- }
- healthTracker.updateExcludedForSuccessfulTaskSet(0, 0,
taskSetExclude3.execToFailures)
-
- val msg2 =
- "Killing excluded executor id 2 since
spark.excludeOnFailure.killExcludedExecutors is set." +
- " (actually decommissioning)"
- verify(allocationClientMock).decommissionExecutor(
- "2", ExecutorDecommissionInfo(msg2), false, false)
- verify(allocationClientMock).decommissionExecutorsOnHost("hostA")
- }
-
test("fetch failure excluding kills executors, configured by
EXCLUDE_ON_FAILURE_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutors(any(), any(), any(),
any())).thenReturn(Seq("called"))
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 56a23ab..92f6a32 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -116,38 +116,6 @@ private[spark] trait DecommissionSuite { k8sSuite:
KubernetesSuite =>
executorPatience = None,
decommissioningTest = false)
}
-
- test("Test decommissioning timeouts", k8sTestTag) {
- sparkAppConf
- .set(config.DECOMMISSION_ENABLED.key, "true")
- .set("spark.kubernetes.container.image", pyImage)
- .set(config.STORAGE_DECOMMISSION_ENABLED.key, "true")
- .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key, "true")
- .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key, "true")
- // Ensure we have somewhere to migrate our data too
- .set("spark.executor.instances", "3")
- // Set super high so the timeout is triggered
- .set("spark.storage.decommission.replicationReattemptInterval",
"8640000")
- // Set super low so the timeout is triggered
- .set(config.EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL.key, "10")
-
- runSparkApplicationAndVerifyCompletion(
- appResource = PYSPARK_DECOMISSIONING,
- mainClass = "",
- expectedDriverLogOnCompletion = Seq(
- "Finished waiting, stopping Spark",
- "Decommission executors",
- "failed to decommission in 10, killing",
- "killed by driver."),
- appArgs = Array.empty[String],
- driverPodChecker = doBasicDriverPyPodCheck,
- executorPodChecker = doBasicExecutorPyPodCheck,
- appLocator = appLocator,
- isJVM = false,
- pyFiles = None,
- executorPatience = None,
- decommissioningTest = true)
- }
}
private[spark] object DecommissionSuite {
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 9f1bcf7..494c825 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -345,11 +345,8 @@ class KubernetesSuite extends SparkFunSuite
}
// Delete the pod to simulate cluster scale down/migration.
// This will allow the pod to remain up for the grace period
- // We set an intentionally long grace period to test that Spark
- // exits once the blocks are done migrating and doesn't wait
for the
- // entire grace period if it does not need to.
kubernetesTestComponents.kubernetesClient.pods()
- .withName(name).withGracePeriod(Int.MaxValue).delete()
+ .withName(name).delete()
logDebug(s"Triggered pod decom/delete: $name deleted")
// Make sure this pod is deleted
Eventually.eventually(TIMEOUT, INTERVAL) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]