This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 30ff2c77043 [SPARK-42689][CORE][SHUFFLE] Allow ShuffleDriverComponent
to declare if shuffle data is reliably stored
30ff2c77043 is described below
commit 30ff2c77043e53e98468844cefdb6c8c8454c967
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Thu Mar 9 01:09:54 2023 -0600
[SPARK-42689][CORE][SHUFFLE] Allow ShuffleDriverComponent to declare if
shuffle data is reliably stored
### What changes were proposed in this pull request?
Currently, if there is an executor node loss, we assume the shuffle data on
that node is also lost. This is not necessarily the case if there is a shuffle
component managing the shuffle data and reliably maintaining it (for example,
in distributed filesystem or in a disaggregated shuffle cluster).
### Why are the changes needed?
Downstream projects have patches to Apache Spark in order to workaround
this issue, for example Apache Celeborn has
[this](https://github.com/apache/incubator-celeborn/blob/main/assets/spark-patch/RSS_RDA_spark3.patch).
### Does this PR introduce _any_ user-facing change?
Enhances the `ShuffleDriverComponents` API, but defaults to current
behavior.
### How was this patch tested?
Existing unit tests, and added more tests.
Closes #40307 from mridulm/support-hook-for-reliable-shuffle.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../spark/shuffle/api/ShuffleDriverComponents.java | 9 ++++
.../apache/spark/ExecutorAllocationManager.scala | 5 +-
.../main/scala/org/apache/spark/SparkContext.scala | 25 +++++-----
.../org/apache/spark/scheduler/DAGScheduler.scala | 3 +-
.../apache/spark/scheduler/TaskSetManager.scala | 1 +
.../spark/ExecutorAllocationManagerSuite.scala | 3 +-
.../scala/org/apache/spark/SparkContextSuite.scala | 8 ++++
.../TestShuffleDataIOWithMockedComponents.scala | 53 ++++++++++++++++++++++
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 23 ++++++++++
9 files changed, 115 insertions(+), 15 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
index b4cec17b85b..5c4c1eff9f5 100644
---
a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
+++
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
@@ -61,4 +61,13 @@ public interface ShuffleDriverComponents {
* @param blocking Whether this call should block on the deletion of the
data.
*/
default void removeShuffle(int shuffleId, boolean blocking) {}
+
+ /**
+ * Does this shuffle component support reliable storage - external to the
lifecycle of the
+ * executor host ? For example, writing shuffle data to a distributed
filesystem or
+ * persisting it in a remote shuffle service.
+ */
+ default boolean supportsReliableStorage() {
+ return false;
+ }
}
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index f06312c15cf..187125a66c9 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -103,7 +103,8 @@ private[spark] class ExecutorAllocationManager(
conf: SparkConf,
cleaner: Option[ContextCleaner] = None,
clock: Clock = new SystemClock(),
- resourceProfileManager: ResourceProfileManager)
+ resourceProfileManager: ResourceProfileManager,
+ reliableShuffleStorage: Boolean)
extends Logging {
allocationManager =>
@@ -203,7 +204,7 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be >
0!")
}
- if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+ if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !reliableShuffleStorage) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
logInfo("Dynamic allocation is enabled without a shuffle service.")
} else if (decommissionEnabled &&
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bb1d0a1c98d..43573894748 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -551,11 +551,6 @@ class SparkContext(config: SparkConf) extends Logging {
executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
}
- _shuffleDriverComponents =
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
- _shuffleDriverComponents.initializeApplication().asScala.foreach { case
(k, v) =>
- _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
- }
-
// We need to register "HeartbeatReceiver" before "createTaskScheduler"
because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
@@ -596,6 +591,13 @@ class SparkContext(config: SparkConf) extends Logging {
_conf.set(APP_ATTEMPT_ID, attemptId)
_env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
}
+
+ // initialize after application id and attempt id has been initialized
+ _shuffleDriverComponents =
ShuffleDataIOUtils.loadShuffleDataIO(_conf).driver()
+ _shuffleDriverComponents.initializeApplication().asScala.foreach { case
(k, v) =>
+ _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
+ }
+
if (_conf.get(UI_REVERSE_PROXY)) {
val proxyUrl =
_conf.get(UI_REVERSE_PROXY_URL).getOrElse("").stripSuffix("/")
System.setProperty("spark.ui.proxyBase", proxyUrl + "/proxy/" +
_applicationId)
@@ -635,7 +637,8 @@ class SparkContext(config: SparkConf) extends Logging {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient],
listenerBus, _conf,
- cleaner = cleaner, resourceProfileManager =
resourceProfileManager))
+ cleaner = cleaner, resourceProfileManager =
resourceProfileManager,
+ reliableShuffleStorage =
_shuffleDriverComponents.supportsReliableStorage()))
case _ =>
None
}
@@ -2149,16 +2152,16 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
- if (_heartbeater != null) {
+ if (_shuffleDriverComponents != null) {
Utils.tryLogNonFatalError {
- _heartbeater.stop()
+ _shuffleDriverComponents.cleanupApplication()
}
- _heartbeater = null
}
- if (_shuffleDriverComponents != null) {
+ if (_heartbeater != null) {
Utils.tryLogNonFatalError {
- _shuffleDriverComponents.cleanupApplication()
+ _heartbeater.stop()
}
+ _heartbeater = null
}
if (env != null && _heartbeatReceiver != null) {
Utils.tryLogNonFatalError {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9ce77ad7609..1a1f0cbba7f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2530,7 +2530,8 @@ private[spark] class DAGScheduler(
// if the cluster manager explicitly tells us that the entire worker was
lost, then
// we know to unregister shuffle output. (Note that "worker" specifically
refers to the process
// from a Standalone cluster, where the shuffle service lives in the
Worker.)
- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
+ val fileLost = !sc.shuffleDriverComponents.supportsReliableStorage() &&
+ (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled)
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 124a27502fe..20a1943fa69 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1054,6 +1054,7 @@ private[spark] class TaskSetManager(
// can destroy the whole host). The reason is the next stage wouldn't be
able to fetch the
// data from this dead executor so we would need to rerun these tasks on
other executors.
val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+ !sched.sc.shuffleDriverComponents.supportsReliableStorage() &&
(reason.isInstanceOf[ExecutorDecommission] ||
!env.blockManager.externalShuffleServiceEnabled)
if (maybeShuffleMapOutputLoss && !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
diff --git
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 1cb913b248f..41d674f39a0 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -1809,9 +1809,10 @@ class ExecutorAllocationManagerSuite extends
SparkFunSuite {
conf: SparkConf,
clock: Clock = new SystemClock()): ExecutorAllocationManager = {
ResourceProfile.reInitDefaultProfile(conf)
+
rpManager = new ResourceProfileManager(conf, listenerBus)
val manager = new ExecutorAllocationManager(client, listenerBus, conf,
clock = clock,
- resourceProfileManager = rpManager)
+ resourceProfileManager = rpManager, reliableShuffleStorage = false)
managers += manager
manager.start()
manager
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index f9869d35382..7bfcbc4b0d8 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -1399,6 +1399,14 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
}
sc.stop()
}
+
+ test("SPARK-42689: ShuffleDataIO initialized after application id has been
configured") {
+ val conf = new SparkConf().setAppName("test").setMaster("local")
+ // TestShuffleDataIO will validate if application id has been configured
in its constructor
+ conf.set(SHUFFLE_IO_PLUGIN_CLASS.key,
classOf[TestShuffleDataIOWithMockedComponents].getName)
+ sc = new SparkContext(conf)
+ sc.stop()
+ }
}
object SparkContextSuite {
diff --git
a/core/src/test/scala/org/apache/spark/TestShuffleDataIOWithMockedComponents.scala
b/core/src/test/scala/org/apache/spark/TestShuffleDataIOWithMockedComponents.scala
new file mode 100644
index 00000000000..1d5ed9f2590
--- /dev/null
+++
b/core/src/test/scala/org/apache/spark/TestShuffleDataIOWithMockedComponents.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Collections => JCollections}
+
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{doNothing, mock, when}
+
+import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleDriverComponents,
ShuffleExecutorComponents}
+
+/**
+ * A test shuffle data IO implementation, which allows both executor and
driver component to
+ * be mocked.
+ * Note: cannot intercept initialization of executor/driver currently.
+ *
+ */
+class TestShuffleDataIOWithMockedComponents(val conf: SparkConf) extends
ShuffleDataIO {
+
+ // ShuffleDataIO must be initialized only after spark.app.id has been
configured
+ assert(conf.getOption("spark.app.id").isDefined)
+
+ private val executorMock = {
+ val m = mock(classOf[ShuffleExecutorComponents])
+ doNothing().when(m).initializeExecutor(any(), any(), any())
+ m
+ }
+
+ private val driverMock = {
+ val m = mock(classOf[ShuffleDriverComponents])
+ when(m.initializeApplication()).thenReturn(JCollections.emptyMap[String,
String]())
+ m
+ }
+
+ def executor: ShuffleExecutorComponents = executorMock
+
+ def driver: ShuffleDriverComponents = driverMock
+}
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 31f2ed0a915..cc6562ef017 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -985,6 +985,29 @@ class DAGSchedulerSuite extends SparkFunSuite with
TempLocalSparkContext with Ti
}
}
+ test("SPARK-42689: worker loss with reliable shuffle storage") {
+ // This is a variant of "worker lost without shuffle service" case from
shuffleFileLossTests
+ // except that spark is using a shuffle driver component which stores
shuffle data reliably
+ // outside of the executor - hence loss of executor does not result in any
cleanup
+
+ conf.set(config.SHUFFLE_IO_PLUGIN_CLASS.key,
+ classOf[TestShuffleDataIOWithMockedComponents].getName)
+ when(sc.shuffleDriverComponents.supportsReliableStorage()).thenReturn(true)
+
+ val event = ExecutorProcessLost("", Some("hostA"))
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(1))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker =
mapOutputTracker)
+ submit(reduceRdd, Array(0))
+ completeShuffleMapStageSuccessfully(0, 0, 1)
+ runEvent(ExecutorLost("hostA-exec", event))
+ verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
+ verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("hostA-exec")
+ assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId,
0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+ }
+
test("SPARK-28967 properties must be cloned before posting to listener bus
for 0 partition") {
val properties = new Properties()
val func = (context: TaskContext, it: Iterator[(_)]) => 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]