This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new db36f74f8fb0 [SPARK-54815][CONNECT] Do not close the class loader of 
the session state if session is still in use
db36f74f8fb0 is described below

commit db36f74f8fb0e34e366daec6fad2510089be7dc2
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Dec 23 15:46:18 2025 +0800

    [SPARK-54815][CONNECT] Do not close the class loader of the session state 
if session is still in use
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/53233 . When 
session state is evicted from `Executor#isolatedSessionCache`, the session may 
still being used by the running tasks. This PR adds ref counting and skips 
closing class loader if the session is still in use.
    
    ### Why are the changes needed?
    
    closing class loader may break running tasks
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, previously long running tasks may fail, and now it's fixed.
    
    ### How was this patch tested?
    
    existing tests. It's hard to construct a long running task and test the 
class loader behavior, but this fix is quite obvious.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    cursor 2.2.20
    
    Closes #53569 from cloud-fan/cache.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../scala/org/apache/spark/executor/Executor.scala | 93 +++++++++++++++-------
 1 file changed, 66 insertions(+), 27 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index aa17f954a7d9..edab354b9607 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,7 +24,7 @@ import java.net.{URI, URL, URLClassLoader}
 import java.nio.ByteBuffer
 import java.util.{Locale, Properties}
 import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.locks.ReentrantLock
 import javax.annotation.concurrent.GuardedBy
 
@@ -59,13 +59,63 @@ import org.apache.spark.util._
 import org.apache.spark.util.ArrayImplicits._
 
 private[spark] class IsolatedSessionState(
-  val sessionUUID: String,
-  var urlClassLoader: MutableURLClassLoader,
-  var replClassLoader: ClassLoader,
-  val currentFiles: HashMap[String, Long],
-  val currentJars: HashMap[String, Long],
-  val currentArchives: HashMap[String, Long],
-  val replClassDirUri: Option[String])
+    val sessionUUID: String,
+    var urlClassLoader: MutableURLClassLoader,
+    var replClassLoader: ClassLoader,
+    val currentFiles: HashMap[String, Long],
+    val currentJars: HashMap[String, Long],
+    val currentArchives: HashMap[String, Long],
+    val replClassDirUri: Option[String]) extends Logging {
+
+  // Reference count for the number of running tasks using this session.
+  private val refCount: AtomicInteger = new AtomicInteger(0)
+
+  // Whether this session has been evicted from the cache.
+  @volatile private var evicted: Boolean = false
+
+  /** Increment the reference count, indicating a task is using this session. 
*/
+  def acquire(): Unit = refCount.incrementAndGet()
+
+  /** Decrement the reference count. If evicted and no more tasks, clean up. */
+  def release(): Unit = {
+    if (refCount.decrementAndGet() == 0 && evicted) {
+      cleanup()
+    }
+  }
+
+  /** Mark this session as evicted. If no tasks are using it, clean up 
immediately. */
+  def markEvicted(): Unit = {
+    evicted = true
+    if (refCount.get() == 0) {
+      cleanup()
+    } else {
+      logInfo(log"Session ${MDC(SESSION_ID, sessionUUID)} evicted but still in 
use by " +
+        log"${MDC(LogKeys.COUNT, refCount.get())} task(s), deferring cleanup")
+    }
+  }
+
+  private def cleanup(): Unit = {
+    // Close the urlClassLoader to release resources.
+    try {
+      urlClassLoader match {
+        case cl: URLClassLoader =>
+          cl.close()
+          logInfo(log"Closed urlClassLoader for session ${MDC(SESSION_ID, 
sessionUUID)}")
+        case _ =>
+      }
+    } catch {
+      case NonFatal(e) =>
+        logWarning(log"Failed to close urlClassLoader for session " +
+          log"${MDC(SESSION_ID, sessionUUID)}", e)
+    }
+    // Delete session files.
+    val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), sessionUUID)
+    if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
+      Utils.deleteRecursively(sessionBasedRoot)
+    }
+    logInfo(log"Session cleaned up: ${MDC(SESSION_ID, sessionUUID)}")
+  }
+}
 
 /**
  * Spark executor, backed by a threadpool to run tasks.
@@ -220,25 +270,9 @@ private[spark] class Executor(
         val state = notification.getValue
         // Cache is always used for isolated sessions.
         assert(!isDefaultState(state.sessionUUID))
-        // Close the urlClassLoader to release resources.
-        try {
-          state.urlClassLoader match {
-            case urlClassLoader: URLClassLoader =>
-              urlClassLoader.close()
-              logInfo(log"Closed urlClassLoader (URLClassLoader) for evicted 
session " +
-                log"${MDC(SESSION_ID, state.sessionUUID)}")
-            case _ =>
-          }
-        } catch {
-          case NonFatal(e) =>
-            logWarning(log"Failed to close urlClassLoader for session " +
-              log"${MDC(SESSION_ID, state.sessionUUID)}", e)
-        }
-        val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), 
state.sessionUUID)
-        if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
-          Utils.deleteRecursively(sessionBasedRoot)
-        }
-        logInfo(log"Session evicted: ${MDC(SESSION_ID, state.sessionUUID)}")
+        // Mark evicted - cleanup will happen immediately if no tasks are 
using it,
+        // or when the last task releases it.
+        state.markEvicted()
       }
     })
     .build[String, IsolatedSessionState]
@@ -600,6 +634,9 @@ private[spark] class Executor(
         case _ => defaultSessionState
       }
 
+      // Pin the session to prevent its class loader from being closed while 
this task is running.
+      isolatedSession.acquire()
+
       setMDCForTask(taskName, mdcProperties)
       threadId = Thread.currentThread.getId
       Thread.currentThread.setName(threadName)
@@ -905,6 +942,8 @@ private[spark] class Executor(
           // are known, and metricsPoller.onTaskStart was called.
           metricsPoller.onTaskCompletion(taskId, task.stageId, 
task.stageAttemptId)
         }
+        // Release the session reference. If evicted and this was the last 
task, cleanup happens.
+        isolatedSession.release()
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to