This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7c146c925b3 [SPARK-45762][CORE] Support shuffle managers defined in
user jars by changing startup order
7c146c925b3 is described below
commit 7c146c925b363fc67eedc7411068f24dd780b583
Author: Alessandro Bellina <[email protected]>
AuthorDate: Thu Nov 16 21:06:06 2023 -0600
[SPARK-45762][CORE] Support shuffle managers defined in user jars by
changing startup order
### What changes were proposed in this pull request?
As reported here https://issues.apache.org/jira/browse/SPARK-45762,
`ShuffleManager` instances defined in a user jar cannot be used in all cases,
unless specified in the `extraClassPath`. We would like to avoid adding extra
configurations if this instance is already included in a jar passed via
`--jars`.
Proposed changes:
Refactor code so we initialize the `ShuffleManager` later, after jars have
been localized. This is especially necessary in the executor, where we would
need to move this initialization until after the `replClassLoader` is updated
with jars passed in `--jars`.
Before this change, the `ShuffleManager` is instantiated at `SparkEnv`
creation. Having to instantiate the `ShuffleManager` this early doesn't work,
because user jars have not been localized in all scenarios, and we will fail to
load the `ShuffleManager` defined in `--jars`. We propose moving the
`ShuffleManager` instantiation to `SparkContext` on the driver, and `Executor`.
### Why are the changes needed?
This is not a new API but a change of startup order. The changed are needed
to improve the user experience for the user by reducing extra configurations
depending on how a spark application is launched.
### Does this PR introduce _any_ user-facing change?
Yes, but it's backwards compatible. Users no longer need to specify a
`ShuffleManager` jar in `extraClassPath`, but they are able to if they desire.
This change is not binary compatible with Spark 3.5.0 (see MIMA comments
below). I have added a rule to MimaExcludes to handle it
https://github.com/apache/spark/pull/43627/commits/970bff4edc6ba14d8de78aa175415e204d6a627b
### How was this patch tested?
Added a unit test showing that a test `ShuffleManager` is available after
`--jars` are passed, but not without (using local-cluster mode).
Tested manually with standalone mode, local-cluster mode, yarn client and
cluster mode, k8s.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43627 from abellina/shuffle_manager_initialization_order.
Authored-by: Alessandro Bellina <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../main/scala/org/apache/spark/SparkContext.scala | 1 +
.../src/main/scala/org/apache/spark/SparkEnv.scala | 38 ++++++-----
.../scala/org/apache/spark/executor/Executor.scala | 13 +++-
.../org/apache/spark/shuffle/ShuffleManager.scala | 26 +++++++-
.../org/apache/spark/storage/BlockManager.scala | 14 +++-
.../spark/storage/BlockManagerMasterEndpoint.scala | 9 ++-
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 77 ++++++++++++++++++++++
.../apache/spark/deploy/SparkSubmitTestUtils.scala | 6 +-
project/MimaExcludes.scala | 4 +-
9 files changed, 160 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 73dcaffa6ce..ed00baa01d6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -577,6 +577,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Initialize any plugins before the task scheduler is initialized.
_plugins = PluginContainer(this, _resources.asJava)
+ _env.initializeShuffleManager()
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 3277f86e367..94a4debd026 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -18,13 +18,13 @@
package org.apache.spark
import java.io.File
-import java.util.Locale
import scala.collection.concurrent
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Properties
+import com.google.common.base.Preconditions
import com.google.common.cache.CacheBuilder
import org.apache.hadoop.conf.Configuration
@@ -63,7 +63,6 @@ class SparkEnv (
val closureSerializer: Serializer,
val serializerManager: SerializerManager,
val mapOutputTracker: MapOutputTracker,
- val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val securityManager: SecurityManager,
@@ -72,6 +71,12 @@ class SparkEnv (
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {
+ // We initialize the ShuffleManager later in SparkContext and Executor to
allow
+ // user jars to define custom ShuffleManagers.
+ private var _shuffleManager: ShuffleManager = _
+
+ def shuffleManager: ShuffleManager = _shuffleManager
+
@volatile private[spark] var isStopped = false
/**
@@ -100,7 +105,9 @@ class SparkEnv (
isStopped = true
pythonWorkers.values.foreach(_.stop())
mapOutputTracker.stop()
- shuffleManager.stop()
+ if (shuffleManager != null) {
+ shuffleManager.stop()
+ }
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
@@ -186,6 +193,12 @@ class SparkEnv (
releasePythonWorker(
pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule,
envVars, worker)
}
+
+ private[spark] def initializeShuffleManager(): Unit = {
+ Preconditions.checkState(null == _shuffleManager,
+ "Shuffle manager already initialized to %s", _shuffleManager)
+ _shuffleManager = ShuffleManager.create(conf, executorId ==
SparkContext.DRIVER_IDENTIFIER)
+ }
}
object SparkEnv extends Logging {
@@ -356,16 +369,6 @@ object SparkEnv extends Logging {
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
- // Let the user specify short names for shuffle managers
- val shortShuffleMgrNames = Map(
- "sort" ->
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
- "tungsten-sort" ->
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
- val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
- val shuffleMgrClass =
- shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT),
shuffleMgrName)
- val shuffleManager =
Utils.instantiateSerializerOrShuffleManager[ShuffleManager](
- shuffleMgrClass, conf, isDriver)
-
val memoryManager: MemoryManager = UnifiedMemoryManager(conf,
numUsableCores)
val blockManagerPort = if (isDriver) {
@@ -403,7 +406,7 @@ object SparkEnv extends Logging {
None
}, blockManagerInfo,
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
- shuffleManager,
+ _shuffleManager = null,
isDriver)),
registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
@@ -416,6 +419,10 @@ object SparkEnv extends Logging {
advertiseAddress, blockManagerPort, numUsableCores,
blockManagerMaster.driverEndpoint)
// NB: blockManager is not valid until initialize() is called later.
+ // SPARK-45762 introduces a change where the ShuffleManager is
initialized later
+ // in the SparkContext and Executor, to allow for custom
ShuffleManagers defined
+ // in user jars. The BlockManager uses a lazy val to obtain the
+ // shuffleManager from the SparkEnv.
val blockManager = new BlockManager(
executorId,
rpcEnv,
@@ -424,7 +431,7 @@ object SparkEnv extends Logging {
conf,
memoryManager,
mapOutputTracker,
- shuffleManager,
+ _shuffleManager = null,
blockTransferService,
securityManager,
externalShuffleClient)
@@ -463,7 +470,6 @@ object SparkEnv extends Logging {
closureSerializer,
serializerManager,
mapOutputTracker,
- shuffleManager,
broadcastManager,
blockManager,
securityManager,
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e340667173b..f2a65aab1ba 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -330,14 +330,21 @@ private[spark] class Executor(
}
updateDependencies(initialUserFiles, initialUserJars, initialUserArchives,
defaultSessionState)
- // Plugins need to load using a class loader that includes the executor's
user classpath.
- // Plugins also needs to be initialized after the heartbeater started
- // to avoid blocking to send heartbeat (see SPARK-32175).
+ // Plugins and shuffle managers need to load using a class loader that
includes the executor's
+ // user classpath. Plugins also needs to be initialized after the
heartbeater started
+ // to avoid blocking to send heartbeat (see SPARK-32175 and SPARK-45762).
private val plugins: Option[PluginContainer] =
Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
PluginContainer(env, resources.asJava)
}
+ // Skip local mode because the ShuffleManager is already initialized
+ if (!isLocal) {
+ Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
+ env.initializeShuffleManager()
+ }
+ }
+
metricsPoller.start()
private[executor] def numRunningTasks: Int = runningTasks.size()
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index 4e2183451c2..8e4636cfefb 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -17,7 +17,11 @@
package org.apache.spark.shuffle
-import org.apache.spark.{ShuffleDependency, TaskContext}
+import java.util.Locale
+
+import org.apache.spark.{ShuffleDependency, SparkConf, TaskContext}
+import org.apache.spark.internal.config
+import org.apache.spark.util.Utils
/**
* Pluggable interface for shuffle systems. A ShuffleManager is created in
SparkEnv on the driver
@@ -94,3 +98,23 @@ private[spark] trait ShuffleManager {
/** Shut down this ShuffleManager. */
def stop(): Unit
}
+
+/**
+ * Utility companion object to create a ShuffleManager given a spark
configuration.
+ */
+private[spark] object ShuffleManager {
+ def create(conf: SparkConf, isDriver: Boolean): ShuffleManager = {
+ Utils.instantiateSerializerOrShuffleManager[ShuffleManager](
+ getShuffleManagerClassName(conf), conf, isDriver)
+ }
+
+ def getShuffleManagerClassName(conf: SparkConf): String = {
+ val shortShuffleMgrNames = Map(
+ "sort" ->
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
+ "tungsten-sort" ->
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
+
+ val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
+ shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT),
shuffleMgrName)
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index da48af90a9c..e64c33382dc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -187,12 +187,17 @@ private[spark] class BlockManager(
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
- shuffleManager: ShuffleManager,
+ private val _shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
externalBlockStoreClient: Option[ExternalBlockStoreClient])
extends BlockDataManager with BlockEvictionHandler with Logging {
+ // We initialize the ShuffleManager later in SparkContext and Executor, to
allow
+ // user jars to define custom ShuffleManagers, as such `_shuffleManager`
will be null here
+ // (except for tests) and we ask for the instance from the SparkEnv.
+ private lazy val shuffleManager =
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
+
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)`
private[spark] val externalShuffleServiceEnabled: Boolean =
externalBlockStoreClient.isDefined
private val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
@@ -587,12 +592,15 @@ private[spark] class BlockManager(
private def registerWithExternalShuffleServer(): Unit = {
logInfo("Registering executor with local external shuffle service.")
+ // we obtain the class name from the configuration, instead of the
ShuffleManager
+ // instance because the ShuffleManager has not been created at this point.
+ val shuffleMgrClass = ShuffleManager.getShuffleManagerClassName(conf)
val shuffleManagerMeta =
if (Utils.isPushBasedShuffleEnabled(conf, isDriver = isDriver,
checkSerializer = false)) {
- s"${shuffleManager.getClass.getName}:" +
+ s"${shuffleMgrClass}:" +
s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}"
} else {
- shuffleManager.getClass.getName
+ shuffleMgrClass
}
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirsString,
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 0d52b66a400..17cd0891532 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
import com.google.common.cache.CacheBuilder
-import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext,
SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED
@@ -55,10 +55,15 @@ class BlockManagerMasterEndpoint(
externalBlockStoreClient: Option[ExternalBlockStoreClient],
blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
mapOutputTracker: MapOutputTrackerMaster,
- shuffleManager: ShuffleManager,
+ private val _shuffleManager: ShuffleManager,
isDriver: Boolean)
extends IsolatedThreadSafeRpcEndpoint with Logging {
+ // We initialize the ShuffleManager later in SparkContext and Executor, to
allow
+ // user jars to define custom ShuffleManagers, as such `_shuffleManager`
will be null here
+ // (except for tests) and we ask for the instance from the SparkEnv.
+ private lazy val shuffleManager =
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
+
// Mapping from executor id to the block manager's local disk directories.
private val executorIdToLocalDirs =
CacheBuilder
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 7ebb0165e62..a032e9aa16b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1414,6 +1414,83 @@ class SparkSubmitSuite
runSparkSubmit(args)
}
+ test("SPARK-45762: The ShuffleManager plugin to use can be defined in a user
jar") {
+ val shuffleManagerBody = """
+ |@Override
+ |public <K, V, C> org.apache.spark.shuffle.ShuffleHandle registerShuffle(
+ | int shuffleId,
+ | org.apache.spark.ShuffleDependency<K, V, C> dependency) {
+ | throw new java.lang.UnsupportedOperationException("This is a test
ShuffleManager!");
+ |}
+ |
+ |@Override
+ |public <K, V> org.apache.spark.shuffle.ShuffleWriter<K, V> getWriter(
+ | org.apache.spark.shuffle.ShuffleHandle handle,
+ | long mapId,
+ | org.apache.spark.TaskContext context,
+ | org.apache.spark.shuffle.ShuffleWriteMetricsReporter metrics) {
+ | throw new java.lang.UnsupportedOperationException("This is a test
ShuffleManager!");
+ |}
+ |
+ |@Override
+ |public <K, C> org.apache.spark.shuffle.ShuffleReader<K, C> getReader(
+ | org.apache.spark.shuffle.ShuffleHandle handle,
+ | int startMapIndex,
+ | int endMapIndex,
+ | int startPartition,
+ | int endPartition,
+ | org.apache.spark.TaskContext context,
+ | org.apache.spark.shuffle.ShuffleReadMetricsReporter metrics) {
+ | throw new java.lang.UnsupportedOperationException("This is a test
ShuffleManager!");
+ |}
+ |
+ |@Override
+ |public boolean unregisterShuffle(int shuffleId) {
+ | throw new java.lang.UnsupportedOperationException("This is a test
ShuffleManager!");
+ |}
+ |
+ |@Override
+ |public org.apache.spark.shuffle.ShuffleBlockResolver
shuffleBlockResolver() {
+ | throw new java.lang.UnsupportedOperationException("This is a test
ShuffleManager!");
+ |}
+ |
+ |@Override
+ |public void stop() {
+ |}
+ """.stripMargin
+
+ val tempDir = Utils.createTempDir()
+ val compiledShuffleManager = TestUtils.createCompiledClass(
+ "TestShuffleManager",
+ tempDir,
+ "",
+ null,
+ Seq.empty,
+ Seq("org.apache.spark.shuffle.ShuffleManager"),
+ shuffleManagerBody)
+
+ val jarUrl = TestUtils.createJar(
+ Seq(compiledShuffleManager),
+ new File(tempDir, "testplugin.jar"))
+
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val argsBase = Seq(
+ "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local-cluster[1,1,1024]",
+ "--conf", "spark.shuffle.manager=TestShuffleManager",
+ "--conf", "spark.ui.enabled=false")
+
+ val argsError = argsBase :+ unusedJar.toString
+ // check process error exit code
+ assertResult(1)(runSparkSubmit(argsError, expectFailure = true))
+
+ val argsSuccess = (argsBase ++ Seq("--jars", jarUrl.toString)) :+
unusedJar.toString
+ // check process success exit code
+ assertResult(0)(
+ runSparkSubmit(argsSuccess, expectFailure = false))
+ }
+
private def testRemoteResources(
enableHttpFs: Boolean,
forceDownloadSchemes: Seq[String] = Nil): Unit = {
diff --git
a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
index 932e972374c..e38be4cefc0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
@@ -42,7 +42,8 @@ trait SparkSubmitTestUtils extends SparkFunSuite with
TimeLimits {
args: Seq[String],
sparkHomeOpt: Option[String] = None,
timeout: Span = defaultSparkSubmitTimeout,
- isSparkTesting: Boolean = true): Unit = {
+ isSparkTesting: Boolean = true,
+ expectFailure: Boolean = false): Int = {
val sparkHome = sparkHomeOpt.getOrElse(
sys.props.getOrElse("spark.test.home", fail("spark.test.home is not
set!")))
val history = ArrayBuffer.empty[String]
@@ -77,7 +78,7 @@ trait SparkSubmitTestUtils extends SparkFunSuite with
TimeLimits {
try {
val exitCode = failAfter(timeout) { process.waitFor() }
- if (exitCode != 0) {
+ if (exitCode != 0 && !expectFailure) {
// include logs in output. Note that logging is async and may not have
completed
// at the time this exception is raised
Thread.sleep(1000)
@@ -90,6 +91,7 @@ trait SparkSubmitTestUtils extends SparkFunSuite with
TimeLimits {
""".stripMargin
}
}
+ exitCode
} catch {
case to: TestFailedDueToTimeoutException =>
val historyLog = history.mkString("\n")
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c0275e16272..d080b16fdc5 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -52,7 +52,9 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.QueryContext.callSite"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.QueryContext.summary"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI$default$3"),
-
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI")
+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI"),
+ // [SPARK-45762][CORE] Support shuffle managers defined in user jars by
changing startup order
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this")
)
// Default exclude rules
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]