This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 752e8a1a7ee0 [SPARK-53807][SPARK-50771][CORE] Fix race condition 
issues between `unlock` and `releaseAllLocksForTask` in `BlockInfoManager`
752e8a1a7ee0 is described below

commit 752e8a1a7ee030ab8d0879c569754edce7b0b0f4
Author: Kousuke Saruta <[email protected]>
AuthorDate: Tue Jan 6 14:39:32 2026 +0900

    [SPARK-53807][SPARK-50771][CORE] Fix race condition issues between `unlock` 
and `releaseAllLocksForTask` in `BlockInfoManager`
    
    ### What changes were proposed in this pull request?
    This PR fixes race condition issues between `unlock` and 
`releaseAllLocksForTask` in `BlockInfoManager`.
    
    In case read locks for a block acquired by a task are released by `unclck` 
and `releaseAllLocksForTask` concurrently, assertion error can happen.
    The reason is calling `entry.getCount` in `releaseAllLocksForTask` can 
return an old value even after the count in an entry is decreased by 
`countsForTask.remove` on another thread. So `info.readerCount -= lockCount` 
can result in a negative number, causing assertion error.
    
    This issue can be reproduced by inserting sleep into `unlock` and 
`releaseAllLocksForTask` like as follows.
    
    * unlock
    ```
       // reader counts. We need to check if the readLocksByTask per tasks are 
present, if they
       // are not then we know releaseAllLocksForTask has already cleaned up 
the read lock.
       val countsForTask = readLocksByTask.get(taskAttemptId)
    +  Thread.sleep(5)
       if (countsForTask != null) {
         assert(info.readerCount > 0, s"Block $blockId is not locked for 
reading")
         info.readerCount -= 1
    ```
    
    * releaseAllLocksForTask
    ```
    +  Thread.sleep(5)
       val readLocks = Option(readLocksByTask.remove(taskAttemptId))
         .getOrElse(ImmutableMultiset.of[BlockId])
       readLocks.entrySet().forEach { entry =>
         val blockId = entry.getElement
         val lockCount = entry.getCount
    +    Thread.sleep(5)
         blocksWithReleasedLocks += blockId
    ```
    
    And then, run the test like as follows.
    
    ```
    $ build/sbt 'core/testOnly org.apache.spark.storage.BlockInfoManagerSuite 
-- -z SPARK-38675'
    ```
    
    The Javadoc for 
[ConcurrentHashMultiset#entrySet](https://guava.dev/releases/33.4.0-jre/api/docs/com/google/common/collect/ConcurrentHashMultiset.html)
 says as follows.
    
    ```
    However, multiset changes may or may not be reflected in any Entry 
instances already retrieved from the entry set (this is 
implementation-dependent)
    ```
    
    So, this PR calculates `lockCount` by calling `readLocks.count` to get the 
latest count, and place it within `blockInfo` block for exclusive execution.
    
    Similar to read locks, a race condition isssue can happen even for write 
locks.
    During `writeLocks.forEach` in `releaseAllLocksForTask`, a `blockId` can be 
removed from `writeLocks` by 
`writeLocksByTask.get(taskAttemptId).remove(blockId)` in `unlock` on another 
thread.
    You can reproduce this issue by the new test added in this PR.
    This PR fixes this issue by checking the existence of a `blockId` by 
`writeLocks.contains(info)` within `blockInfo` block.
    
    ### Why are the changes needed?
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Confirmed `SPARK-38675 - concurrent unlock and releaseAllLocksForTask calls 
should not fail` passes even if sleeps are inserted into `unlock` and 
`releaseAllLocksForTask` like as follows.
    
    * unlock
    ```
     val countsForTask = readLocksByTask.get(taskAttemptId)
     if (countsForTask != null) {
    +  Thread.sleep(5)
       assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
       info.readerCount -= 1
       val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
    ```
    
    * releaseAllLocksForTask
    ```
    +  Thread.sleep(5)
       val readLocks = Option(readLocksByTask.remove(taskAttemptId))
         .getOrElse(ImmutableMultiset.of[BlockId])
       readLocks.entrySet().forEach { entry =>
         val blockId = entry.getElement
    ```
    ```
      // Using readLocks.count instead of entry.getCount is intentional. See 
discussion in
      // SPARK-50771.
      val lockCount = readLocks.count(blockId)
    + Thread.sleep(5)
    
      // lockCount can be 0 if read locks for `blockId` are released in 
`unlock` concurrently.
      if (lockCount > 0) {
        blocksWithReleasedLocks += blockId
        info.readerCount -= lockCount
    ```
    
    Also new test for write locks is added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52524 from sarutak/fix-race-blockinfomanager.
    
    Authored-by: Kousuke Saruta <[email protected]>
    Signed-off-by: Kousuke Saruta <[email protected]>
---
 .../apache/spark/storage/BlockInfoManager.scala    | 33 +++++++++++++++-------
 .../spark/storage/BlockInfoManagerSuite.scala      | 20 +++++++++++++
 2 files changed, 43 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 322e4b18c5d4..69250e247573 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -397,7 +397,10 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
     blockInfo(blockId) { (info, condition) =>
       if (info.writerTask != BlockInfo.NO_WRITER) {
         info.writerTask = BlockInfo.NO_WRITER
-        writeLocksByTask.get(taskAttemptId).remove(blockId)
+        val blockIds = writeLocksByTask.get(taskAttemptId)
+        if (blockIds != null) {
+          blockIds.remove(blockId)
+        }
       } else {
         // There can be a race between unlock and releaseAllLocksForTask which 
causes negative
         // reader counts. We need to check if the readLocksByTask per tasks 
are present, if they
@@ -489,23 +492,33 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
     val writeLocks = 
Option(writeLocksByTask.remove(taskAttemptId)).getOrElse(util.Set.of())
     writeLocks.forEach { blockId =>
       blockInfo(blockId) { (info, condition) =>
-        assert(info.writerTask == taskAttemptId)
-        info.writerTask = BlockInfo.NO_WRITER
-        condition.signalAll()
+        // Check the existence of `blockId` because `unlock` may have already 
removed it
+        // concurrently.
+        if (writeLocks.contains(blockId)) {
+          blocksWithReleasedLocks += blockId
+          assert(info.writerTask == taskAttemptId)
+          info.writerTask = BlockInfo.NO_WRITER
+          condition.signalAll()
+        }
       }
-      blocksWithReleasedLocks += blockId
     }
 
     val readLocks = Option(readLocksByTask.remove(taskAttemptId))
       .getOrElse(ImmutableMultiset.of[BlockId])
     readLocks.entrySet().forEach { entry =>
       val blockId = entry.getElement
-      val lockCount = entry.getCount
-      blocksWithReleasedLocks += blockId
       blockInfo(blockId) { (info, condition) =>
-        info.readerCount -= lockCount
-        assert(info.readerCount >= 0)
-        condition.signalAll()
+        // Calculating lockCount by readLocks.count instead of entry.getCount 
is intentional. See
+        // discussion in SPARK-50771 and the corresponding PR.
+        val lockCount = readLocks.count(blockId)
+
+        // lockCount can be 0 if read locks for `blockId` are released in 
`unlock` concurrently.
+        if (lockCount > 0) {
+          blocksWithReleasedLocks += blockId
+          info.readerCount -= lockCount
+          assert(info.readerCount >= 0)
+          condition.signalAll()
+        }
       }
     }
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index f7c7ca2bd936..4b34c13706ad 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -403,4 +403,24 @@ class BlockInfoManagerSuite extends SparkFunSuite {
       }
     }
   }
+
+  test("SPARK-53807 - concurrent unlock and releaseAllLocksForTask for write 
should not fail") {
+    val blockId = TestBlockId("block")
+    assert(blockInfoManager.lockNewBlockForWriting(blockId, newBlockInfo()))
+    blockInfoManager.unlock(blockId)
+
+    // Without the fix the block below almost always fails.
+    (0 to 10).foreach { task =>
+      withTaskId(task) {
+        blockInfoManager.registerTask(task)
+
+        assert(blockInfoManager.lockForWriting(blockId).isDefined)
+
+        val future = Future(blockInfoManager.unlock(blockId, Option(task)))
+        blockInfoManager.releaseAllLocksForTask(task)
+
+        ThreadUtils.awaitReady(future, 100.millis)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to