This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3a4ea84f9928 [SPARK-49502][CORE] Avoid NPE in
SparkEnv.get.shuffleManager.unregisterShuffle
3a4ea84f9928 is described below
commit 3a4ea84f9928f5ecae5ce02979dbb4d65ecbd0c7
Author: sychen <[email protected]>
AuthorDate: Fri Sep 6 10:50:18 2024 -0500
[SPARK-49502][CORE] Avoid NPE in
SparkEnv.get.shuffleManager.unregisterShuffle
### What changes were proposed in this pull request?
This PR aims to avoid NPE in
`SparkEnv.get.shuffleManager.unregisterShuffle`.
### Why are the changes needed?
After SPARK-45762, the shuffle manager is initialized after the block
manager, which means that when the driver cleans up the shuffle, the shuffle
manager may not have been initialized yet, causing NPE.
```
24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized
BlockManager: BlockManagerId(168, x, 25467, None)
24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR
BlockManagerStorageEndpoint: Error in removing shuffle 29
java.lang.NullPointerException
at
org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61)
at
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at
org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47977 from cxzl25/SPARK-49502.
Authored-by: sychen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +-
.../org/apache/spark/storage/BlockManagerStorageEndpoint.scala | 8 +++++++-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index f8b7cdcf7a8b..6b7fc1b0804b 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -73,7 +73,7 @@ class SparkEnv (
// We initialize the ShuffleManager later in SparkContext and Executor to
allow
// user jars to define custom ShuffleManagers.
- private var _shuffleManager: ShuffleManager = _
+ @volatile private var _shuffleManager: ShuffleManager = _
def shuffleManager: ShuffleManager = _shuffleManager
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
index 686ac1eb786e..f29e8778da03 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
@@ -60,7 +60,13 @@ class BlockManagerStorageEndpoint(
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}
- SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
+ val shuffleManager = SparkEnv.get.shuffleManager
+ if (shuffleManager != null) {
+ shuffleManager.unregisterShuffle(shuffleId)
+ } else {
+ logDebug(log"Ignore remove shuffle ${MDC(SHUFFLE_ID, shuffleId)}")
+ true
+ }
}
case DecommissionBlockManager =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]