This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 50fb994ccc2f [SPARK-56448][CONNECT] Fix NPE on Spark Connect client 
restart due to ammonite compile cache
50fb994ccc2f is described below

commit 50fb994ccc2f2a8dbbb253f8346df0614b5df261
Author: Anupam Yadav <[email protected]>
AuthorDate: Sat May 16 09:40:05 2026 -0700

    [SPARK-56448][CONNECT] Fix NPE on Spark Connect client restart due to 
ammonite compile cache
    
    ### What changes were proposed in this pull request?
    
    The Spark Connect REPL uses Ammonite. Ammonite's default `Storage.Folder`
    persists compiled predef classes under `~/.ammonite/<version>/cache`. On a
    subsequent REPL start from the same working directory, the cached
    `CodePredef` class is reloaded but its reference to the per-session
    `ArgsPredef` helper is stale, producing a `NullPointerException` during
    predef initialization.
    
    This PR switches the Connect REPL's compile cache to `Storage.InMemory`
    so every session starts fresh and no stale cache is carried across
    restarts.
    
    ### Why are the changes needed?
    
    The stale-cache failure is a user-visible crash on every every subsequent 
call
    of `bin/spark-shell --remote sc://...` from the same working
    directory. Reproduction steps are on the JIRA.
    
    ### Does this PR introduce _any_ user-facing change?
    
    There is one minor observable tradeoff: because the compile cache is
    now in-memory rather than persisted, each REPL start recompiles the
    predef instead of reading the cached classfiles. This adds ~a few
    hundred milliseconds to subsequent REPL startups but eliminates the
    NPE. We believe this is the correct tradeoff — a small startup cost
    is preferable to a hard failure.
    
    ### How was this patch tested?
    
    Added `AmmoniteReplE2ESuite` with a test starting `bin/spark-shell --remote 
sc://...`
    twice  and checking both run was successful.
    
    I verified the negative case locally by temporarily reverting only the 
`Storage.InMemory()`
    line and re-running the test; it fails with:
    ```
    - SPARK-56448: restarting spark-shell --remote does not throw NPE *** 
FAILED ***
      1 did not equal 0 Second spark-shell failed (exit=1): WARNING: Using 
incubator modules: jdk.incubator.vector
      Exception in thread "main" java.lang.NullPointerException: Cannot invoke 
"ammonite.predef.ArgsPredef$Helper.spark()" because the return value of 
"ammonite.predef.CodePredef.ArgsPredef()" is null
            at ammonite.predef.CodePredef$Helper.<init>((console):7)
            at ammonite.predef.CodePredef$.<clinit>((console):6)
            at ammonite.predef.CodePredef.$main((console))
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
            at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.base/java.lang.reflect.Method.invoke(Method.java:569)
            at 
ammonite.runtime.Evaluator$$anon$1.$anonfun$evalMain$1(Evaluator.scala:108)
            at ammonite.util.Util$.withContextClassloader(Util.scala:21)
            at ammonite.runtime.Evaluator$$anon$1.evalMain(Evaluator.scala:90)
            at 
ammonite.interp.Interpreter.$anonfun$processAllScriptBlocks$10(Interpreter.scala:594)
            at ammonite.util.Res$Success.map(Res.scala:63)
            at 
ammonite.interp.Interpreter.$anonfun$processAllScriptBlocks$9(Interpreter.scala:594)
            at scala.Option$WithFilter.map(Option.scala:242)
            at ammonite.interp.Interpreter.loop$1(Interpreter.scala:574)
            at 
ammonite.interp.Interpreter.processAllScriptBlocks(Interpreter.scala:644)
            at 
ammonite.interp.Interpreter.$anonfun$processModule$6(Interpreter.scala:432)
            at ammonite.util.Catching.flatMap(Res.scala:110)
            at 
ammonite.interp.Interpreter.$anonfun$processModule$5(Interpreter.scala:423)
    ...
    ```
    
    Restoring the fix makes the test pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes
    
    Closes #55720 from yadavay-amzn/fix/spark_SPARK-56448.
    
    Authored-by: Anupam Yadav <[email protected]>
    Signed-off-by: attilapiros <[email protected]>
    (cherry picked from commit 3e83503adaa0f7d72424842d6613a4bd18d5a943)
    Signed-off-by: attilapiros <[email protected]>
---
 .../apache/spark/sql/application/ConnectRepl.scala |  5 ++
 .../sql/application/AmmoniteReplE2ESuite.scala     | 72 ++++++++++++++++++++++
 2 files changed, 77 insertions(+)

diff --git 
a/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
 
b/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
index ca82381eec9e..e8b14951b68a 100644
--- 
a/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
+++ 
b/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
@@ -26,6 +26,7 @@ import ammonite.compiler.iface.CodeWrapper
 import ammonite.interp.{Interpreter, Watchable}
 import ammonite.main.Defaults
 import ammonite.repl.Repl
+import ammonite.runtime.Storage
 import ammonite.util.{Bind, Imports, Name, PredefInfo, Ref, Res, Util}
 import ammonite.util.Util.newLine
 
@@ -102,9 +103,13 @@ Spark session available as 'spark'.
         |""".stripMargin
     // Please note that we make ammonite generate classes instead of objects.
     // Classes tend to have superior serialization behavior when using UDFs.
+    // SPARK-56448: Use Storage.InMemory to avoid stale compile cache across 
restarts.
+    // The default Storage.Folder persists compiled predef classes under 
~/.ammonite. On a
+    // subsequent REPL start, the cached CodePredef references a stale 
ArgsPredef, causing NPE.
     val main = new ammonite.Main(
       welcomeBanner = Option(splash.format(spark_version, spark.version)),
       predefCode = predefCode,
+      storageBackend = new Storage.InMemory(),
       replCodeWrapper = ExtendedCodeClassWrapper,
       scriptCodeWrapper = ExtendedCodeClassWrapper,
       inputStream = inputStream,
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
new file mode 100644
index 000000000000..ffa892048c77
--- /dev/null
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.application
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.ArrayBuffer
+import scala.sys.process.BasicIO
+
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
+import org.apache.spark.tags.AmmoniteTest
+
+@AmmoniteTest
+class AmmoniteReplE2ESuite extends ConnectFunSuite with RemoteSparkSession {
+
+  private def runSparkShell(): (Int, String, String) = {
+    val sparkHome = sys.props.getOrElse(
+      "spark.test.home",
+      sys.env.getOrElse("SPARK_HOME", fail("spark.test.home or SPARK_HOME not 
set")))
+    val command = Seq(s"$sparkHome/bin/spark-shell", "--remote", 
s"sc://localhost:$serverPort")
+
+    val process = new ProcessBuilder(command: _*).start()
+    // Close stdin immediately so shell exits on EOF
+    process.getOutputStream.close()
+
+    val stdout = new ArrayBuffer[String]()
+    val stderr = new ArrayBuffer[String]()
+    val stdoutThread = new Thread() {
+      setDaemon(true)
+      override def run(): Unit = BasicIO.processFully(stdout += 
_)(process.getInputStream)
+    }
+    val stderrThread = new Thread() {
+      setDaemon(true)
+      override def run(): Unit = BasicIO.processFully(stderr += 
_)(process.getErrorStream)
+    }
+    stdoutThread.start()
+    stderrThread.start()
+
+    val exited = process.waitFor(60, TimeUnit.SECONDS)
+    if (!exited) {
+      process.destroyForcibly()
+      fail("spark-shell did not exit within 60 seconds")
+    }
+    stdoutThread.join(10000)
+    stderrThread.join(10000)
+    (process.exitValue(), stdout.mkString("\n"), stderr.mkString("\n"))
+  }
+
+  test("SPARK-56448: restarting spark-shell --remote does not throw NPE") {
+    // First invocation
+    val (exit1, _, stderr1) = runSparkShell()
+    assert(exit1 == 0, s"First spark-shell failed (exit=$exit1): $stderr1")
+
+    // Second invocation -- without the fix, this would NPE from stale 
Ammonite cache
+    val (exit2, _, stderr2) = runSparkShell()
+    assert(exit2 == 0, s"Second spark-shell failed (exit=$exit2): $stderr2")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to