This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 622fa35bb003 [SPARK-51537][CONNECT][CORE][FOLLOW-UP][TEST] Add test to 
ensure Spark plugins are not reloaded
622fa35bb003 is described below

commit 622fa35bb0035b3a8af9dde42df00246b513b0f3
Author: Bobby Wang <wbo4...@gmail.com>
AuthorDate: Fri Apr 11 08:41:24 2025 +0900

    [SPARK-51537][CONNECT][CORE][FOLLOW-UP][TEST] Add test to ensure Spark 
plugins are not reloaded
    
    ### What changes were proposed in this pull request?
    
    This PR adds a unit test to verify that Spark plugin JARs specified via 
`--jars` are not reloaded.
    
    ### Why are the changes needed?
    
    This PR is a followup of 
https://github.com/apache/spark/pull/50334#discussion_r2015850636
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    The test added can pass
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50526 from wbo4958/SPARK-51537-followup.
    
    Authored-by: Bobby Wang <wbo4...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/executor/ClassLoaderIsolationSuite.scala | 112 ++++++++++++++++++++-
 1 file changed, 109 insertions(+), 3 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala
index b33e92efd7ae..688419665508 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala
@@ -17,10 +17,14 @@
 
 package org.apache.spark.executor
 
+import java.io.File
+import java.net.URL
+
 import scala.util.Properties
 
-import org.apache.spark.{JobArtifactSet, JobArtifactState, LocalSparkContext, 
SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.util.Utils
+import org.apache.spark.{JobArtifactSet, JobArtifactState, LocalSparkContext, 
SparkConf, SparkContext, SparkFunSuite, TestUtils}
+import org.apache.spark.util.{MutableURLClassLoader, Utils}
+
 
 class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext  {
 
@@ -134,7 +138,7 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with 
LocalSparkContext  {
     )
 
     
JobArtifactSet.withActiveJobArtifactState(artifactSetWithHelloV2.state.get) {
-      sc.parallelize(1 to 1).foreach { i =>
+      sc.parallelize(1 to 1).foreach { _ =>
         val cls = Utils.classForName("com.example.Hello$")
         val module = cls.getField("MODULE$").get(null)
         val result = cls.getMethod("test").invoke(module).asInstanceOf[Int]
@@ -144,4 +148,106 @@ class ClassLoaderIsolationSuite extends SparkFunSuite 
with LocalSparkContext  {
       }
     }
   }
+
+  test("SPARK-51537 Executor isolation avoids reloading plugin jars") {
+    val tempDir = Utils.createTempDir()
+
+    val testCodeBody =
+      s"""
+         | public static boolean flag = false;
+         |""".stripMargin
+
+    val compiledTestCode = TestUtils.createCompiledClass(
+      "TestFoo",
+      tempDir,
+      "",
+      null,
+      Seq.empty,
+      Seq.empty,
+      testCodeBody)
+
+    // Initialize the static variable flag in TestFoo when loading plugin at 
the first time.
+    // If the plugin is reloaded, the TestFoo.flag will be set to false by 
default.
+    val executorPluginCodeBody =
+      s"""
+         |@Override
+         |public void init(
+         |    org.apache.spark.api.plugin.PluginContext ctx,
+         |    java.util.Map<String, String> extraConf) {
+         |  TestFoo.flag = true;
+         |}
+      """.stripMargin
+
+    val thisClassPath =
+      sys.props("java.class.path").split(File.pathSeparator).map(p => new 
File(p).toURI.toURL)
+
+    val compiledExecutorPlugin = TestUtils.createCompiledClass(
+      "TestExecutorPlugin",
+      tempDir,
+      "",
+      null,
+      Seq(tempDir.toURI.toURL) ++ thisClassPath,
+      Seq("org.apache.spark.api.plugin.ExecutorPlugin"),
+      executorPluginCodeBody)
+
+    val sparkPluginCodeBody =
+      """
+        |@Override
+        |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
+        |  return new TestExecutorPlugin();
+        |}
+        |
+        |@Override
+        |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { 
return null; }
+      """.stripMargin
+
+    val compiledSparkPlugin = TestUtils.createCompiledClass(
+      "TestSparkPlugin",
+      tempDir,
+      "",
+      null,
+      Seq(tempDir.toURI.toURL) ++ thisClassPath,
+      Seq("org.apache.spark.api.plugin.SparkPlugin"),
+      sparkPluginCodeBody)
+
+    val jarUrl = TestUtils.createJar(
+      Seq(compiledSparkPlugin, compiledExecutorPlugin, compiledTestCode),
+      new File(tempDir, "testplugin.jar"))
+
+    def getClassLoader: MutableURLClassLoader = {
+      val loader = new MutableURLClassLoader(new Array[URL](0),
+        Thread.currentThread.getContextClassLoader)
+      Thread.currentThread.setContextClassLoader(loader)
+      loader
+    }
+    // SparkContext does not add plugin jars specified by `spark.jars` 
configuration
+    // to the classpath, causing ClassNotFoundException when initializing 
plugins
+    // in SparkContext. We manually add the jars to the ClassLoader to resolve 
this.
+    val loader = getClassLoader
+    loader.addURL(jarUrl)
+
+    sc = new SparkContext(new SparkConf()
+      .setAppName("avoid-reloading-plugins")
+      .setMaster("local-cluster[1, 1, 1024]")
+      .set("spark.jars", jarUrl.toString)
+      .set("spark.plugins", "TestSparkPlugin"))
+
+    val jobArtifactSet = new JobArtifactSet(
+      Some(JobArtifactState(uuid = "avoid-reloading-plugins", replClassDirUri 
= None)),
+      jars = Map.empty,
+      files = Map.empty,
+      archives = Map.empty
+    )
+
+    JobArtifactSet.withActiveJobArtifactState(jobArtifactSet.state.get) {
+      sc.parallelize(1 to 1).foreach { _ =>
+        val cls1 = Utils.classForName("TestFoo")
+        val z = cls1.getField("flag").getBoolean(null)
+        // If the plugin has been reloaded, the TestFoo.flag will be false.
+        if (!z) {
+          throw new RuntimeException("The spark plugin is reloaded")
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to