This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f6c6d646b45 [SPARK-52923][CORE] Allow ShuffleManager to control push 
merge during shuffle registration
8f6c6d646b45 is described below

commit 8f6c6d646b4596877bae111858227653d42ec51d
Author: gaoyajun02 <[email protected]>
AuthorDate: Sat Nov 29 21:14:51 2025 -0600

    [SPARK-52923][CORE] Allow ShuffleManager to control push merge during 
shuffle registration
    
    ### What changes were proposed in this pull request?
    
    This PR moves the `shuffleManager.registerShuffle()` call to occur after 
the initialization of `_shuffleMergeAllowed` in `ShuffleDependency`.
    
    ### Why are the changes needed?
    
    While `spark.shuffle.push.enabled` provides global control over push-based 
shuffle, there are scenarios requiring more granular control: 
    - Mass spark application migration scenarios where different jobs may need 
different shuffle strategies
    - Remote shuffle manager(e.g. celeborn/uniffle) need shuffle-level fallback 
capabilities to push-based shuffle
    - Dynamic decision making based on shuffle characteristics during shuffle 
registration
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is an internal refactoring that maintains backward compatibility. 
The default behavior remains unchanged.
    
    ### How was this patch tested?
    
    - Existing unit tests continue to pass
    - The change only affects the order of initialization, not the logic
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51629 from gaoyajun02/SPARK-52923.
    
    Authored-by: gaoyajun02 <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 core/src/main/scala/org/apache/spark/Dependency.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index c436025e06bb..8738298fed0e 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -127,15 +127,15 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   val shuffleId: Int = _rdd.context.newShuffleId()
 
-  val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
-    shuffleId, this)
-
   private[this] val numPartitions = rdd.partitions.length
 
   // By default, shuffle merge is allowed for ShuffleDependency if push based 
shuffle
   // is enabled
   private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled()
 
+  val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
+    shuffleId, this)
+
   private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean): 
Unit = {
     _shuffleMergeAllowed = shuffleMergeAllowed
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to