This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 55789d367f [ZEPPELIN-6256] Fix resource leaks in
SparkInterpreterLauncher.detectSparkScalaVersion
55789d367f is described below
commit 55789d367fae348a750b4ff6332779f749615d4b
Author: renechoi <[email protected]>
AuthorDate: Mon Aug 25 15:47:14 2025 +0900
[ZEPPELIN-6256] Fix resource leaks in
SparkInterpreterLauncher.detectSparkScalaVersion
## What is this PR for?
This PR fixes resource leaks in the `detectSparkScalaVersion` method of
`SparkInterpreterLauncher.java` by modernizing the implementation to capture
process output directly without temporary files.
**Previous issues:**
- Resource leak from unclosed FileInputStream when reading process output
- Disk space accumulation from undeleted temporary files
**Solution:**
- Directly capture process error stream using
`IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8)`
- Eliminated temporary file creation entirely for cleaner, more efficient
implementation
## What type of PR is it?
Bug Fix / Code Improvement
## Todos
- [x] Code review
- [x] CI build verification
## What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-6256
## How should this be tested?
**Unit test added:**
- `testDetectSparkScalaVersionDirectStreamCapture`: Verifies the
modernized stream capture approach works correctly and returns valid Scala
version (2.12 or 2.13)
**Manual testing:**
1. Start Zeppelin with Spark interpreter
2. Verify Spark interpreter launches successfully
3. Confirm no temporary files created in temp directory
4. Monitor system resources - no file descriptor leaks
## Screenshots (if appropriate)
N/A
## Questions:
- **Does the license files need to update?** No
- **Is there breaking changes for older versions?** No
- **Does this needs documentation?** No
## Implementation Details
- Modernized to capture process output directly via `IOUtils.toString()`
without intermediate temporary files
- Maintains full backward compatibility with no API changes
- Cleaner, more maintainable code following modern Java practices
- Eliminates all file system operations for output capture
Closes #5000 from renechoi/ZEPPELIN-6256-upstream-clean.
Signed-off-by: Philipp Dallig <[email protected]>
---
.../launcher/SparkInterpreterLauncher.java | 8 +++++---
.../launcher/SparkInterpreterLauncherTest.java | 22 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 8898adfbec..d131c816e0 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -270,11 +270,13 @@ public class SparkInterpreterLauncher extends
StandardInterpreterLauncher {
LOGGER.info("Detect scala version from SPARK_HOME: {}", sparkHome);
ProcessBuilder builder = new ProcessBuilder(sparkHome +
"/bin/spark-submit", "--version");
builder.environment().putAll(env);
- File processOutputFile = File.createTempFile("zeppelin-spark", ".out");
- builder.redirectError(processOutputFile);
+
Process process = builder.start();
process.waitFor();
- String processOutput = IOUtils.toString(new
FileInputStream(processOutputFile), StandardCharsets.UTF_8);
+
+ // Capture the error stream directly without using a temp file
+ String processOutput = IOUtils.toString(process.getErrorStream(),
StandardCharsets.UTF_8);
+
Pattern pattern = Pattern.compile(".*Using Scala version (.*),.*");
Matcher matcher = pattern.matcher(processOutput);
if (matcher.find()) {
diff --git
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index 4721585724..52ac5a09b5 100644
---
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -39,6 +39,9 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+import java.lang.reflect.Method;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -330,6 +333,25 @@ class SparkInterpreterLauncherTest {
FileUtils.deleteDirectory(localRepoPath.toFile());
}
+ @Test
+ void testDetectSparkScalaVersionDirectStreamCapture() throws Exception {
+ SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf,
null);
+
+ // Use reflection to access private method
+ Method detectSparkScalaVersionMethod =
SparkInterpreterLauncher.class.getDeclaredMethod(
+ "detectSparkScalaVersion", String.class, Map.class);
+ detectSparkScalaVersionMethod.setAccessible(true);
+
+ Map<String, String> env = new HashMap<>();
+
+ // Call the method
+ String scalaVersion = (String)
detectSparkScalaVersionMethod.invoke(launcher, sparkHome, env);
+
+ // Verify we got a valid result
+ assertTrue(scalaVersion.equals("2.12") || scalaVersion.equals("2.13"),
+ "Expected scala version 2.12 or 2.13 but got: " + scalaVersion);
+ }
+
@Test
void testDetectSparkScalaVersionByReplClassWithNonExistentDirectory() throws
Exception {
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf,
null);