This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 622fa35bb003 [SPARK-51537][CONNECT][CORE][FOLLOW-UP][TEST] Add test to ensure Spark plugins are not reloaded 622fa35bb003 is described below commit 622fa35bb0035b3a8af9dde42df00246b513b0f3 Author: Bobby Wang <wbo4...@gmail.com> AuthorDate: Fri Apr 11 08:41:24 2025 +0900 [SPARK-51537][CONNECT][CORE][FOLLOW-UP][TEST] Add test to ensure Spark plugins are not reloaded ### What changes were proposed in this pull request? This PR adds a unit test to verify that Spark plugin JARs specified via `--jars` are not reloaded. ### Why are the changes needed? This PR is a followup of https://github.com/apache/spark/pull/50334#discussion_r2015850636 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The test added can pass ### Was this patch authored or co-authored using generative AI tooling? No Closes #50526 from wbo4958/SPARK-51537-followup. Authored-by: Bobby Wang <wbo4...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/executor/ClassLoaderIsolationSuite.scala | 112 ++++++++++++++++++++- 1 file changed, 109 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala b/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala index b33e92efd7ae..688419665508 100644 --- a/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala @@ -17,10 +17,14 @@ package org.apache.spark.executor +import java.io.File +import java.net.URL + import scala.util.Properties -import org.apache.spark.{JobArtifactSet, JobArtifactState, LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.util.Utils +import org.apache.spark.{JobArtifactSet, JobArtifactState, LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils} +import org.apache.spark.util.{MutableURLClassLoader, Utils} + class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext { @@ -134,7 +138,7 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext { ) JobArtifactSet.withActiveJobArtifactState(artifactSetWithHelloV2.state.get) { - sc.parallelize(1 to 1).foreach { i => + sc.parallelize(1 to 1).foreach { _ => val cls = Utils.classForName("com.example.Hello$") val module = cls.getField("MODULE$").get(null) val result = cls.getMethod("test").invoke(module).asInstanceOf[Int] @@ -144,4 +148,106 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext { } } } + + test("SPARK-51537 Executor isolation avoids reloading plugin jars") { + val tempDir = Utils.createTempDir() + + val testCodeBody = + s""" + | public static boolean flag = false; + |""".stripMargin + + val compiledTestCode = TestUtils.createCompiledClass( + "TestFoo", + tempDir, + "", + null, + Seq.empty, + Seq.empty, + testCodeBody) + + // Initialize the static variable flag in TestFoo when loading plugin at the first time. + // If the plugin is reloaded, the TestFoo.flag will be set to false by default. + val executorPluginCodeBody = + s""" + |@Override + |public void init( + | org.apache.spark.api.plugin.PluginContext ctx, + | java.util.Map<String, String> extraConf) { + | TestFoo.flag = true; + |} + """.stripMargin + + val thisClassPath = + sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL) + + val compiledExecutorPlugin = TestUtils.createCompiledClass( + "TestExecutorPlugin", + tempDir, + "", + null, + Seq(tempDir.toURI.toURL) ++ thisClassPath, + Seq("org.apache.spark.api.plugin.ExecutorPlugin"), + executorPluginCodeBody) + + val sparkPluginCodeBody = + """ + |@Override + |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() { + | return new TestExecutorPlugin(); + |} + | + |@Override + |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; } + """.stripMargin + + val compiledSparkPlugin = TestUtils.createCompiledClass( + "TestSparkPlugin", + tempDir, + "", + null, + Seq(tempDir.toURI.toURL) ++ thisClassPath, + Seq("org.apache.spark.api.plugin.SparkPlugin"), + sparkPluginCodeBody) + + val jarUrl = TestUtils.createJar( + Seq(compiledSparkPlugin, compiledExecutorPlugin, compiledTestCode), + new File(tempDir, "testplugin.jar")) + + def getClassLoader: MutableURLClassLoader = { + val loader = new MutableURLClassLoader(new Array[URL](0), + Thread.currentThread.getContextClassLoader) + Thread.currentThread.setContextClassLoader(loader) + loader + } + // SparkContext does not add plugin jars specified by `spark.jars` configuration + // to the classpath, causing ClassNotFoundException when initializing plugins + // in SparkContext. We manually add the jars to the ClassLoader to resolve this. + val loader = getClassLoader + loader.addURL(jarUrl) + + sc = new SparkContext(new SparkConf() + .setAppName("avoid-reloading-plugins") + .setMaster("local-cluster[1, 1, 1024]") + .set("spark.jars", jarUrl.toString) + .set("spark.plugins", "TestSparkPlugin")) + + val jobArtifactSet = new JobArtifactSet( + Some(JobArtifactState(uuid = "avoid-reloading-plugins", replClassDirUri = None)), + jars = Map.empty, + files = Map.empty, + archives = Map.empty + ) + + JobArtifactSet.withActiveJobArtifactState(jobArtifactSet.state.get) { + sc.parallelize(1 to 1).foreach { _ => + val cls1 = Utils.classForName("TestFoo") + val z = cls1.getField("flag").getBoolean(null) + // If the plugin has been reloaded, the TestFoo.flag will be false. + if (!z) { + throw new RuntimeException("The spark plugin is reloaded") + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org