This is an automated email from the ASF dual-hosted git repository.
attilapiros pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 50fb994ccc2f [SPARK-56448][CONNECT] Fix NPE on Spark Connect client
restart due to ammonite compile cache
50fb994ccc2f is described below
commit 50fb994ccc2f2a8dbbb253f8346df0614b5df261
Author: Anupam Yadav <[email protected]>
AuthorDate: Sat May 16 09:40:05 2026 -0700
[SPARK-56448][CONNECT] Fix NPE on Spark Connect client restart due to
ammonite compile cache
### What changes were proposed in this pull request?
The Spark Connect REPL uses Ammonite. Ammonite's default `Storage.Folder`
persists compiled predef classes under `~/.ammonite/<version>/cache`. On a
subsequent REPL start from the same working directory, the cached
`CodePredef` class is reloaded but its reference to the per-session
`ArgsPredef` helper is stale, producing a `NullPointerException` during
predef initialization.
This PR switches the Connect REPL's compile cache to `Storage.InMemory`
so every session starts fresh and no stale cache is carried across
restarts.
### Why are the changes needed?
The stale-cache failure is a user-visible crash on every every subsequent
call
of `bin/spark-shell --remote sc://...` from the same working
directory. Reproduction steps are on the JIRA.
### Does this PR introduce _any_ user-facing change?
There is one minor observable tradeoff: because the compile cache is
now in-memory rather than persisted, each REPL start recompiles the
predef instead of reading the cached classfiles. This adds ~a few
hundred milliseconds to subsequent REPL startups but eliminates the
NPE. We believe this is the correct tradeoff — a small startup cost
is preferable to a hard failure.
### How was this patch tested?
Added `AmmoniteReplE2ESuite` with a test starting `bin/spark-shell --remote
sc://...`
twice and checking both run was successful.
I verified the negative case locally by temporarily reverting only the
`Storage.InMemory()`
line and re-running the test; it fails with:
```
- SPARK-56448: restarting spark-shell --remote does not throw NPE ***
FAILED ***
1 did not equal 0 Second spark-shell failed (exit=1): WARNING: Using
incubator modules: jdk.incubator.vector
Exception in thread "main" java.lang.NullPointerException: Cannot invoke
"ammonite.predef.ArgsPredef$Helper.spark()" because the return value of
"ammonite.predef.CodePredef.ArgsPredef()" is null
at ammonite.predef.CodePredef$Helper.<init>((console):7)
at ammonite.predef.CodePredef$.<clinit>((console):6)
at ammonite.predef.CodePredef.$main((console))
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at
ammonite.runtime.Evaluator$$anon$1.$anonfun$evalMain$1(Evaluator.scala:108)
at ammonite.util.Util$.withContextClassloader(Util.scala:21)
at ammonite.runtime.Evaluator$$anon$1.evalMain(Evaluator.scala:90)
at
ammonite.interp.Interpreter.$anonfun$processAllScriptBlocks$10(Interpreter.scala:594)
at ammonite.util.Res$Success.map(Res.scala:63)
at
ammonite.interp.Interpreter.$anonfun$processAllScriptBlocks$9(Interpreter.scala:594)
at scala.Option$WithFilter.map(Option.scala:242)
at ammonite.interp.Interpreter.loop$1(Interpreter.scala:574)
at
ammonite.interp.Interpreter.processAllScriptBlocks(Interpreter.scala:644)
at
ammonite.interp.Interpreter.$anonfun$processModule$6(Interpreter.scala:432)
at ammonite.util.Catching.flatMap(Res.scala:110)
at
ammonite.interp.Interpreter.$anonfun$processModule$5(Interpreter.scala:423)
...
```
Restoring the fix makes the test pass.
### Was this patch authored or co-authored using generative AI tooling?
Yes
Closes #55720 from yadavay-amzn/fix/spark_SPARK-56448.
Authored-by: Anupam Yadav <[email protected]>
Signed-off-by: attilapiros <[email protected]>
(cherry picked from commit 3e83503adaa0f7d72424842d6613a4bd18d5a943)
Signed-off-by: attilapiros <[email protected]>
---
.../apache/spark/sql/application/ConnectRepl.scala | 5 ++
.../sql/application/AmmoniteReplE2ESuite.scala | 72 ++++++++++++++++++++++
2 files changed, 77 insertions(+)
diff --git
a/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
b/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
index ca82381eec9e..e8b14951b68a 100644
---
a/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
+++
b/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
@@ -26,6 +26,7 @@ import ammonite.compiler.iface.CodeWrapper
import ammonite.interp.{Interpreter, Watchable}
import ammonite.main.Defaults
import ammonite.repl.Repl
+import ammonite.runtime.Storage
import ammonite.util.{Bind, Imports, Name, PredefInfo, Ref, Res, Util}
import ammonite.util.Util.newLine
@@ -102,9 +103,13 @@ Spark session available as 'spark'.
|""".stripMargin
// Please note that we make ammonite generate classes instead of objects.
// Classes tend to have superior serialization behavior when using UDFs.
+ // SPARK-56448: Use Storage.InMemory to avoid stale compile cache across
restarts.
+ // The default Storage.Folder persists compiled predef classes under
~/.ammonite. On a
+ // subsequent REPL start, the cached CodePredef references a stale
ArgsPredef, causing NPE.
val main = new ammonite.Main(
welcomeBanner = Option(splash.format(spark_version, spark.version)),
predefCode = predefCode,
+ storageBackend = new Storage.InMemory(),
replCodeWrapper = ExtendedCodeClassWrapper,
scriptCodeWrapper = ExtendedCodeClassWrapper,
inputStream = inputStream,
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
new file mode 100644
index 000000000000..ffa892048c77
--- /dev/null
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.application
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.ArrayBuffer
+import scala.sys.process.BasicIO
+
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
+import org.apache.spark.tags.AmmoniteTest
+
+@AmmoniteTest
+class AmmoniteReplE2ESuite extends ConnectFunSuite with RemoteSparkSession {
+
+ private def runSparkShell(): (Int, String, String) = {
+ val sparkHome = sys.props.getOrElse(
+ "spark.test.home",
+ sys.env.getOrElse("SPARK_HOME", fail("spark.test.home or SPARK_HOME not
set")))
+ val command = Seq(s"$sparkHome/bin/spark-shell", "--remote",
s"sc://localhost:$serverPort")
+
+ val process = new ProcessBuilder(command: _*).start()
+ // Close stdin immediately so shell exits on EOF
+ process.getOutputStream.close()
+
+ val stdout = new ArrayBuffer[String]()
+ val stderr = new ArrayBuffer[String]()
+ val stdoutThread = new Thread() {
+ setDaemon(true)
+ override def run(): Unit = BasicIO.processFully(stdout +=
_)(process.getInputStream)
+ }
+ val stderrThread = new Thread() {
+ setDaemon(true)
+ override def run(): Unit = BasicIO.processFully(stderr +=
_)(process.getErrorStream)
+ }
+ stdoutThread.start()
+ stderrThread.start()
+
+ val exited = process.waitFor(60, TimeUnit.SECONDS)
+ if (!exited) {
+ process.destroyForcibly()
+ fail("spark-shell did not exit within 60 seconds")
+ }
+ stdoutThread.join(10000)
+ stderrThread.join(10000)
+ (process.exitValue(), stdout.mkString("\n"), stderr.mkString("\n"))
+ }
+
+ test("SPARK-56448: restarting spark-shell --remote does not throw NPE") {
+ // First invocation
+ val (exit1, _, stderr1) = runSparkShell()
+ assert(exit1 == 0, s"First spark-shell failed (exit=$exit1): $stderr1")
+
+ // Second invocation -- without the fix, this would NPE from stale
Ammonite cache
+ val (exit2, _, stderr2) = runSparkShell()
+ assert(exit2 == 0, s"Second spark-shell failed (exit=$exit2): $stderr2")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]