This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new db36f74f8fb0 [SPARK-54815][CONNECT] Do not close the class loader of
the session state if session is still in use
db36f74f8fb0 is described below
commit db36f74f8fb0e34e366daec6fad2510089be7dc2
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Dec 23 15:46:18 2025 +0800
[SPARK-54815][CONNECT] Do not close the class loader of the session state
if session is still in use
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/53233 . When
session state is evicted from `Executor#isolatedSessionCache`, the session may
still being used by the running tasks. This PR adds ref counting and skips
closing class loader if the session is still in use.
### Why are the changes needed?
closing class loader may break running tasks
### Does this PR introduce _any_ user-facing change?
Yes, previously long running tasks may fail, and now it's fixed.
### How was this patch tested?
existing tests. It's hard to construct a long running task and test the
class loader behavior, but this fix is quite obvious.
### Was this patch authored or co-authored using generative AI tooling?
cursor 2.2.20
Closes #53569 from cloud-fan/cache.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/executor/Executor.scala | 93 +++++++++++++++-------
1 file changed, 66 insertions(+), 27 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index aa17f954a7d9..edab354b9607 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,7 +24,7 @@ import java.net.{URI, URL, URLClassLoader}
import java.nio.ByteBuffer
import java.util.{Locale, Properties}
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
@@ -59,13 +59,63 @@ import org.apache.spark.util._
import org.apache.spark.util.ArrayImplicits._
private[spark] class IsolatedSessionState(
- val sessionUUID: String,
- var urlClassLoader: MutableURLClassLoader,
- var replClassLoader: ClassLoader,
- val currentFiles: HashMap[String, Long],
- val currentJars: HashMap[String, Long],
- val currentArchives: HashMap[String, Long],
- val replClassDirUri: Option[String])
+ val sessionUUID: String,
+ var urlClassLoader: MutableURLClassLoader,
+ var replClassLoader: ClassLoader,
+ val currentFiles: HashMap[String, Long],
+ val currentJars: HashMap[String, Long],
+ val currentArchives: HashMap[String, Long],
+ val replClassDirUri: Option[String]) extends Logging {
+
+ // Reference count for the number of running tasks using this session.
+ private val refCount: AtomicInteger = new AtomicInteger(0)
+
+ // Whether this session has been evicted from the cache.
+ @volatile private var evicted: Boolean = false
+
+ /** Increment the reference count, indicating a task is using this session.
*/
+ def acquire(): Unit = refCount.incrementAndGet()
+
+ /** Decrement the reference count. If evicted and no more tasks, clean up. */
+ def release(): Unit = {
+ if (refCount.decrementAndGet() == 0 && evicted) {
+ cleanup()
+ }
+ }
+
+ /** Mark this session as evicted. If no tasks are using it, clean up
immediately. */
+ def markEvicted(): Unit = {
+ evicted = true
+ if (refCount.get() == 0) {
+ cleanup()
+ } else {
+ logInfo(log"Session ${MDC(SESSION_ID, sessionUUID)} evicted but still in
use by " +
+ log"${MDC(LogKeys.COUNT, refCount.get())} task(s), deferring cleanup")
+ }
+ }
+
+ private def cleanup(): Unit = {
+ // Close the urlClassLoader to release resources.
+ try {
+ urlClassLoader match {
+ case cl: URLClassLoader =>
+ cl.close()
+ logInfo(log"Closed urlClassLoader for session ${MDC(SESSION_ID,
sessionUUID)}")
+ case _ =>
+ }
+ } catch {
+ case NonFatal(e) =>
+ logWarning(log"Failed to close urlClassLoader for session " +
+ log"${MDC(SESSION_ID, sessionUUID)}", e)
+ }
+ // Delete session files.
+ val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), sessionUUID)
+ if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
+ Utils.deleteRecursively(sessionBasedRoot)
+ }
+ logInfo(log"Session cleaned up: ${MDC(SESSION_ID, sessionUUID)}")
+ }
+}
/**
* Spark executor, backed by a threadpool to run tasks.
@@ -220,25 +270,9 @@ private[spark] class Executor(
val state = notification.getValue
// Cache is always used for isolated sessions.
assert(!isDefaultState(state.sessionUUID))
- // Close the urlClassLoader to release resources.
- try {
- state.urlClassLoader match {
- case urlClassLoader: URLClassLoader =>
- urlClassLoader.close()
- logInfo(log"Closed urlClassLoader (URLClassLoader) for evicted
session " +
- log"${MDC(SESSION_ID, state.sessionUUID)}")
- case _ =>
- }
- } catch {
- case NonFatal(e) =>
- logWarning(log"Failed to close urlClassLoader for session " +
- log"${MDC(SESSION_ID, state.sessionUUID)}", e)
- }
- val sessionBasedRoot = new File(SparkFiles.getRootDirectory(),
state.sessionUUID)
- if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
- Utils.deleteRecursively(sessionBasedRoot)
- }
- logInfo(log"Session evicted: ${MDC(SESSION_ID, state.sessionUUID)}")
+ // Mark evicted - cleanup will happen immediately if no tasks are
using it,
+ // or when the last task releases it.
+ state.markEvicted()
}
})
.build[String, IsolatedSessionState]
@@ -600,6 +634,9 @@ private[spark] class Executor(
case _ => defaultSessionState
}
+ // Pin the session to prevent its class loader from being closed while
this task is running.
+ isolatedSession.acquire()
+
setMDCForTask(taskName, mdcProperties)
threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
@@ -905,6 +942,8 @@ private[spark] class Executor(
// are known, and metricsPoller.onTaskStart was called.
metricsPoller.onTaskCompletion(taskId, task.stageId,
task.stageAttemptId)
}
+ // Release the session reference. If evicted and this was the last
task, cleanup happens.
+ isolatedSession.release()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]