This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ee6ed43aa046 [SPARK-46733][CORE] Simplify the BlockManager by the exit
operation only depend on interrupt thread
ee6ed43aa046 is described below
commit ee6ed43aa04682fa0b4c59a6b28c211b7e4fb184
Author: beliefer <[email protected]>
AuthorDate: Wed Jan 24 01:21:08 2024 -0600
[SPARK-46733][CORE] Simplify the BlockManager by the exit operation only
depend on interrupt thread
### What changes were proposed in this pull request?
This PR propose to simplify the `BlockManager`.
### Why are the changes needed?
Currently, close or destroy `BlockManager` depend on interrupt thread and
the volatile variable `stopped`.
In fact, we can change the `stopped` to a local variable on stack and let
the close operation of `BlockManager` only depend on interrupt thread.
For further optimization, this PR using `running` instead of `stopped`.
### Does this PR introduce _any_ user-facing change?
'No'.
### How was this patch tested?
GA tests.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #44732 from beliefer/simplify-ContextCleaner.
Authored-by: beliefer <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e996444d7846..42bbd025177b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -2209,7 +2209,6 @@ private[spark] object BlockManager {
new ConcurrentHashMap)
private val POLL_TIMEOUT = 1000
- @volatile private var stopped = false
private val cleaningThread = new Thread() { override def run(): Unit = {
keepCleaning() } }
cleaningThread.setDaemon(true)
@@ -2235,13 +2234,13 @@ private[spark] object BlockManager {
}
def stop(): Unit = {
- stopped = true
cleaningThread.interrupt()
cleaningThread.join()
}
private def keepCleaning(): Unit = {
- while (!stopped) {
+ var running = true
+ while (running) {
try {
Option(referenceQueue.remove(POLL_TIMEOUT))
.map(_.asInstanceOf[ReferenceWithCleanup])
@@ -2251,7 +2250,7 @@ private[spark] object BlockManager {
}
} catch {
case _: InterruptedException =>
- // no-op
+ running = false
case NonFatal(e) =>
logError("Error in cleaning thread", e)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]