This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 68db395867a [SPARK-45057][CORE] Avoid acquire read lock when
keepReadLock is false
68db395867a is described below
commit 68db395867a3292e9261dd8a3dc191754e1645ef
Author: Warren Zhu <[email protected]>
AuthorDate: Thu Sep 28 18:51:33 2023 -0500
[SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false
### What changes were proposed in this pull request?
Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When
`keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock
or potential deadlock issue.
When 2 tasks try to compute same rdd with replication level of 2 and
running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057]
Task thread hold write lock and waiting for replication to remote executor
while shuffle server thread which handling block upload request waiting on
`lockForReading` in
[BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24)
### Why are the changes needed?
This could save unnecessary read lock acquire and avoid deadlock issue
mention above.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added UT in BlockInfoManagerSuite
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43067 from warrenzhu25/deadlock.
Authored-by: Warren Zhu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
.../scala/org/apache/spark/storage/BlockInfoManager.scala | 11 +++++++----
.../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +-----
.../org/apache/spark/storage/BlockInfoManagerSuite.scala | 14 ++++++++++++++
3 files changed, 22 insertions(+), 9 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 9eb1418fd16..d89e6682adf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -383,13 +383,14 @@ private[storage] class BlockInfoManager extends Logging {
* then just go ahead and acquire the write lock. Otherwise, if another
thread is already
* writing the block, then we wait for the write to finish before acquiring
the read lock.
*
- * @return true if the block did not already exist, false otherwise. If this
returns false, then
- * a read lock on the existing block will be held. If this returns
true, a write lock on
- * the new block will be held.
+ * @return true if the block did not already exist, false otherwise.
+ * If this returns true, a write lock on the new block will be held.
+ * If this returns false then a read lock will be held iff
keepReadLock == true.
*/
def lockNewBlockForWriting(
blockId: BlockId,
- newBlockInfo: BlockInfo): Boolean = {
+ newBlockInfo: BlockInfo,
+ keepReadLock: Boolean = true): Boolean = {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
// Get the lock that will be associated with the to-be written block and
lock it for the entire
// duration of this operation. This way we prevent race conditions when
two threads try to write
@@ -405,6 +406,8 @@ private[storage] class BlockInfoManager extends Logging {
val result = lockForWriting(blockId, blocking = false)
assert(result.isDefined)
return true
+ } else if (!keepReadLock) {
+ return false
} else {
// Block already exists. This could happen if another thread races
with us to compute
// the same block. In this case we try to acquire a read lock, if
the locking succeeds
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 06c2e615fbc..389fbeb90f5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1444,14 +1444,10 @@ private[spark] class BlockManager(
val putBlockInfo = {
val newInfo = new BlockInfo(level, classTag, tellMaster)
- if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+ if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo,
keepReadLock)) {
newInfo
} else {
logWarning(s"Block $blockId already exists on this machine; not
re-adding it")
- if (!keepReadLock) {
- // lockNewBlockForWriting returned a read lock on the existing
block, so we must free it:
- releaseLock(blockId)
- }
return None
}
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index a1cd1dbc9be..6dfd3168996 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -166,6 +166,20 @@ class BlockInfoManagerSuite extends SparkFunSuite {
assert(blockInfoManager.get("block").get.readerCount === 1)
}
+ test("lockNewBlockForWriting should not block when keepReadLock is false") {
+ withTaskId(0) {
+ assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
+ }
+ val lock1Future = Future {
+ withTaskId(1) {
+ blockInfoManager.lockNewBlockForWriting("block", newBlockInfo(), false)
+ }
+ }
+
+ assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds))
+ assert(blockInfoManager.get("block").get.readerCount === 0)
+ }
+
test("read locks are reentrant") {
withTaskId(1) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]