This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 9e35613b2f [ZEPPELIN-6193] Refactor SparkSubmitInterpreter to Improve Readability and Error Handling 9e35613b2f is described below commit 9e35613b2f4d4de626ebf9e4ecf166023df32b16 Author: Gyeongtae Park <67095975+parkgyeong...@users.noreply.github.com> AuthorDate: Tue Jun 3 22:06:36 2025 +0900 [ZEPPELIN-6193] Refactor SparkSubmitInterpreter to Improve Readability and Error Handling ### What is this PR for? - Improved Javadoc structure using `<p>` tags and consistent indentation. - Refined class and method-level documentation for better developer experience. - Made `sparkHome` a final field to reflect its immutable nature. - Enhanced error message when `SPARK_HOME` is missing. - Removed unused import `EnvironmentUtils`. - Reformatted string concatenation for better clarity. - Standardized formatting and styling across methods and comments. ### What type of PR is it? Refactoring ### Todos * [x] Ensure %spark-submit command still works correctly after the refactor * [x] Validate that Spark UI link is extracted and shown in Zeppelin frontend ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-6193 ### How should this be tested? 1. Launch Zeppelin with the updated interpreter. 2. Configure the interpreter setting with a valid SPARK_HOME. 3. Run a sample %spark-submit paragraph with a valid Spark job (e.g., SparkPi). 4. (Verify) The job executes successfully. 5. (Verify) The Spark UI link appears in the Zeppelin frontend. 6. (Verify) No regressions in paragraph output or error handling ### Screenshots (if appropriate) <img width="774" alt="image" src="https://github.com/user-attachments/assets/d68c1c29-9905-43bd-bf2c-d29635c42751" /> ### Questions: * Does the license files need to update? No. * Is there breaking changes for older versions? No. * Does this needs documentation? No. Closes #4936 from ParkGyeongTae/spark-submit-interpreter-docs-and-safety. Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> --- .../spark/submit/SparkSubmitInterpreter.java | 77 ++++++++++++++++++---- 1 file changed, 66 insertions(+), 11 deletions(-) diff --git a/spark-submit/src/main/java/org/apache/zeppelin/spark/submit/SparkSubmitInterpreter.java b/spark-submit/src/main/java/org/apache/zeppelin/spark/submit/SparkSubmitInterpreter.java index 53232ed20b..27a54ae080 100644 --- a/spark-submit/src/main/java/org/apache/zeppelin/spark/submit/SparkSubmitInterpreter.java +++ b/spark-submit/src/main/java/org/apache/zeppelin/spark/submit/SparkSubmitInterpreter.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.spark.submit; -import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterOutput; @@ -29,7 +28,9 @@ import org.apache.zeppelin.shell.ShellInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.nio.file.Paths; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -37,33 +38,82 @@ import java.util.concurrent.ConcurrentMap; /** - * Support %spark-submit which run spark-submit command. Internally, - * it would run shell command via ShellInterpreter. - * + * Interpreter that supports the `%spark-submit` command in Apache Zeppelin. + * <p> + * This interpreter allows users to submit Spark jobs using the standard `spark-submit` CLI + * interface. + * Internally, it delegates execution to the ShellInterpreter to run `spark-submit` as a shell + * command. + * <p> + * Key features: + * - Automatically builds and executes the `spark-submit` command using the configured SPARK_HOME + * path. + * - Extracts the Spark UI URL from logs and publishes it to the Zeppelin frontend. + * - Tracks the YARN Application ID from the logs, allowing the job to be cancelled via `yarn + * application -kill`. + * - Handles both YARN and local Spark modes. + * <p> + * Required configuration: + * - SPARK_HOME must be set in the interpreter setting or environment variables. It should point + * to the root + * directory of a valid Spark installation. + * <p> + * Example usage in a Zeppelin notebook: + * %spark-submit --class org.apache.spark.examples.SparkPi /path/to/jar spark-args */ public class SparkSubmitInterpreter extends ShellInterpreter { private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitInterpreter.class); - private String sparkHome; + private final String sparkHome; // paragraphId --> yarnAppId private ConcurrentMap<String, String> yarnAppIdMap = new ConcurrentHashMap<>(); public SparkSubmitInterpreter(Properties property) { super(property); - // Set time to be max integer so that the shell process won't timeout. - setProperty("shell.command.timeout.millisecs", Integer.MAX_VALUE + ""); + setProperty("shell.command.timeout.millisecs", String.valueOf(Integer.MAX_VALUE)); this.sparkHome = properties.getProperty("SPARK_HOME"); LOGGER.info("SPARK_HOME: {}", sparkHome); } + /** + * Executes a spark-submit command based on the user's input in a Zeppelin notebook paragraph. + * <p> + * This method constructs the full spark-submit CLI command using the configured SPARK_HOME and + * the + * provided arguments. It performs validation (e.g., SPARK_HOME presence), logs the execution, + * and registers a listener to extract Spark UI information from the output logs. + * <p> + * If SPARK_HOME is not set, an error result is returned. + * After execution, any associated YARN application ID is removed from the internal tracking map. + * + * @param cmd The spark-submit arguments entered by the user (e.g., "--class ... + * /path/to/jar"). + * @param context The interpreter context for the current paragraph execution. + * @return An {@link InterpreterResult} representing the outcome of the spark-submit execution. + */ @Override public InterpreterResult internalInterpret(String cmd, InterpreterContext context) { if (StringUtils.isBlank(cmd)) { return new InterpreterResult(InterpreterResult.Code.SUCCESS); } - String sparkSubmitCommand = sparkHome + "/bin/spark-submit " + cmd.trim(); + + if (StringUtils.isBlank(sparkHome)) { + String errorMsg = "SPARK_HOME is not set. Please configure SPARK_HOME in the interpreter " + + "setting or environment."; + LOGGER.error("Failed to run spark-submit: {}", errorMsg); + return new InterpreterResult(InterpreterResult.Code.ERROR, errorMsg); + } + + File sparkSubmit = Paths.get(sparkHome, "bin", "spark-submit").toFile(); + if (!sparkSubmit.exists()) { + String errorMsg = "spark-submit command does not exist at: " + sparkSubmit.getAbsolutePath(); + LOGGER.error("Failed to run spark-submit: {}", errorMsg); + return new InterpreterResult(InterpreterResult.Code.ERROR, errorMsg); + } + + String sparkSubmitCommand = sparkSubmit.getAbsolutePath() + " " + cmd.trim(); LOGGER.info("Run spark command: {}", sparkSubmitCommand); context.out.addInterpreterOutListener(new SparkSubmitOutputListener(context)); InterpreterResult result = super.internalInterpret(sparkSubmitCommand, context); @@ -78,7 +128,7 @@ public class SparkSubmitInterpreter extends ShellInterpreter { if (StringUtils.isNotBlank(yarnAppId)) { try { LOGGER.info("Try to kill yarn app: {} of code: {}", yarnAppId, context.getParagraphText()); - Runtime.getRuntime().exec(new String[] {"yarn", "application", "-kill", yarnAppId}); + Runtime.getRuntime().exec(new String[]{"yarn", "application", "-kill", yarnAppId}); } catch (IOException e) { LOGGER.warn("Fail to kill yarn app, please check whether yarn command is on your PATH", e); } @@ -88,9 +138,14 @@ public class SparkSubmitInterpreter extends ShellInterpreter { } /** - * InterpreterOutputListener which extract spark ui link from logs. + * An output listener that parses logs generated by spark-submit to extract metadata such as: + * - Spark UI URL (for both YARN and local modes) + * - YARN application ID (for job cancellation) + * <p> + * The extracted information is sent back to Zeppelin frontend to enable dynamic rendering + * of job status and UI links. */ - private class SparkSubmitOutputListener implements InterpreterOutputListener { + private class SparkSubmitOutputListener implements InterpreterOutputListener { private InterpreterContext context; private boolean isSparkUrlSent = false;