This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 01a2db317b90 [SPARK-53581][CORE] Fix potential thread-safety issue for mapTaskIds.add() 01a2db317b90 is described below commit 01a2db317b903d1f9a14aeea57181f86892764aa Author: Yi Wu <yi...@databricks.com> AuthorDate: Tue Sep 16 15:16:06 2025 -0700 [SPARK-53581][CORE] Fix potential thread-safety issue for mapTaskIds.add() ### What changes were proposed in this pull request? This a followup for https://github.com/apache/spark/pull/47037. This PR wraps up the synchronize lock for invocation of `OpenHashSet.add()` in `IndexShuffleBlockResolver`. ### Why are the changes needed? `OpenHashSet` is not thread safe. We should enfoce synchronize lock when invokes the add function to ensure the thread-safety. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52337 from Ngone51/fix-thread-safety. Authored-by: Yi Wu <yi...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 78aba006b836dc466da28700fa72d13ff4a9e3a6) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index bf3117a9a9b1..dfb3e637a843 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -310,13 +310,13 @@ private[spark] class IndexShuffleBlockResolver( val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( shuffleId, _ => new OpenHashSet[Long](8) ) - mapTaskIds.add(mapId) + mapTaskIds.synchronized { mapTaskIds.add(mapId) } case ShuffleDataBlockId(shuffleId, mapId, _) => val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( shuffleId, _ => new OpenHashSet[Long](8) ) - mapTaskIds.add(mapId) + mapTaskIds.synchronized { mapTaskIds.add(mapId) } case _ => // Unreachable } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org