This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 01a2db317b90 [SPARK-53581][CORE] Fix potential thread-safety issue for 
mapTaskIds.add()
01a2db317b90 is described below

commit 01a2db317b903d1f9a14aeea57181f86892764aa
Author: Yi Wu <yi...@databricks.com>
AuthorDate: Tue Sep 16 15:16:06 2025 -0700

    [SPARK-53581][CORE] Fix potential thread-safety issue for mapTaskIds.add()
    
    ### What changes were proposed in this pull request?
    
    This a followup for https://github.com/apache/spark/pull/47037. This PR 
wraps up the synchronize lock for invocation of `OpenHashSet.add()` in 
`IndexShuffleBlockResolver`.
    
    ### Why are the changes needed?
    
    `OpenHashSet` is not thread safe. We should enfoce synchronize lock when 
invokes the add function to ensure the thread-safety.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    N/A.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52337 from Ngone51/fix-thread-safety.
    
    Authored-by: Yi Wu <yi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 78aba006b836dc466da28700fa72d13ff4a9e3a6)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index bf3117a9a9b1..dfb3e637a843 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -310,13 +310,13 @@ private[spark] class IndexShuffleBlockResolver(
             val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
               shuffleId, _ => new OpenHashSet[Long](8)
             )
-            mapTaskIds.add(mapId)
+            mapTaskIds.synchronized { mapTaskIds.add(mapId) }
 
           case ShuffleDataBlockId(shuffleId, mapId, _) =>
             val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
               shuffleId, _ => new OpenHashSet[Long](8)
             )
-            mapTaskIds.add(mapId)
+            mapTaskIds.synchronized { mapTaskIds.add(mapId) }
 
           case _ => // Unreachable
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to