This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a4ea84f9928 [SPARK-49502][CORE] Avoid NPE in 
SparkEnv.get.shuffleManager.unregisterShuffle
3a4ea84f9928 is described below

commit 3a4ea84f9928f5ecae5ce02979dbb4d65ecbd0c7
Author: sychen <[email protected]>
AuthorDate: Fri Sep 6 10:50:18 2024 -0500

    [SPARK-49502][CORE] Avoid NPE in 
SparkEnv.get.shuffleManager.unregisterShuffle
    
    ### What changes were proposed in this pull request?
    This PR aims to avoid NPE in 
`SparkEnv.get.shuffleManager.unregisterShuffle`.
    
    ### Why are the changes needed?
    After SPARK-45762, the shuffle manager is initialized after the block 
manager, which means that when the driver cleans up the shuffle, the shuffle 
manager may not have been initialized yet, causing NPE.
    
    ```
    24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized 
BlockManager: BlockManagerId(168, x, 25467, None)
    24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR 
BlockManagerStorageEndpoint: Error in removing shuffle 29
    java.lang.NullPointerException
            at 
org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61)
            at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
            at 
org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
            at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
            at scala.util.Success.$anonfun$map$1(Try.scala:255)
            at scala.util.Success.map(Try.scala:213)
            at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
            at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
            at 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
            at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    GA
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47977 from cxzl25/SPARK-49502.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 core/src/main/scala/org/apache/spark/SparkEnv.scala               | 2 +-
 .../org/apache/spark/storage/BlockManagerStorageEndpoint.scala    | 8 +++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index f8b7cdcf7a8b..6b7fc1b0804b 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -73,7 +73,7 @@ class SparkEnv (
 
   // We initialize the ShuffleManager later in SparkContext and Executor to 
allow
   // user jars to define custom ShuffleManagers.
-  private var _shuffleManager: ShuffleManager = _
+  @volatile private var _shuffleManager: ShuffleManager = _
 
   def shuffleManager: ShuffleManager = _shuffleManager
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
index 686ac1eb786e..f29e8778da03 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
@@ -60,7 +60,13 @@ class BlockManagerStorageEndpoint(
         if (mapOutputTracker != null) {
           mapOutputTracker.unregisterShuffle(shuffleId)
         }
-        SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
+        val shuffleManager = SparkEnv.get.shuffleManager
+        if (shuffleManager != null) {
+          shuffleManager.unregisterShuffle(shuffleId)
+        } else {
+          logDebug(log"Ignore remove shuffle ${MDC(SHUFFLE_ID, shuffleId)}")
+          true
+        }
       }
 
     case DecommissionBlockManager =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to