This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new f3b16ad [ZEPPELIN-5337] Spark scope mode is broken f3b16ad is described below commit f3b16ad5e3a60e1fcc682df504ba72f712995622 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon May 3 00:00:06 2021 +0800 [ZEPPELIN-5337] Spark scope mode is broken ### What is this PR for? Spark scope mode is broken because we use different scala shell output dir for each spark scala shell. This PR fix this issue by using a shared output folder for all the scala shell instances. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5337 ### How should this be tested? * UT is added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4105 from zjffdu/ZEPPELIN-5337 and squashes the following commits: 1cf75223c [Jeff Zhang] [ZEPPELIN-5337] Spark scope mode is broken --- .../apache/zeppelin/spark/SparkInterpreter.java | 19 ++++++- .../zeppelin/spark/SparkScala210Interpreter.scala | 7 ++- .../zeppelin/spark/SparkScala211Interpreter.scala | 9 ++-- .../zeppelin/spark/SparkScala212Interpreter.scala | 8 ++- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 2 - .../zeppelin/integration/SparkIntegrationTest.java | 59 +++++++++++++++++++--- 6 files changed, 79 insertions(+), 25 deletions(-) diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 5e10e14..7b1460a 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -33,8 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -49,6 +52,18 @@ import java.util.concurrent.atomic.AtomicInteger; public class SparkInterpreter extends AbstractInterpreter { private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreter.class); + private static File scalaShellOutputDir; + + static { + try { + // scala shell output will be shared between multiple spark scala shell, so use static field + scalaShellOutputDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "spark") + .toFile(); + scalaShellOutputDir.deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("Fail to create scala shell output dir", e); + } + } private static AtomicInteger SESSION_NUM = new AtomicInteger(0); private AbstractSparkScalaInterpreter innerInterpreter; @@ -158,8 +173,8 @@ public class SparkInterpreter extends AbstractInterpreter { String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion); Class clazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName); return (AbstractSparkScalaInterpreter) - clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class) - .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader); + clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class) + .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader, scalaShellOutputDir); } @Override diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala index c093636..34d69c0 100644 --- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala +++ b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala @@ -40,7 +40,8 @@ class SparkScala210Interpreter(override val conf: SparkConf, override val depFiles: java.util.List[String], override val properties: Properties, override val interpreterGroup: InterpreterGroup, - override val sparkInterpreterClassLoader: URLClassLoader) + override val sparkInterpreterClassLoader: URLClassLoader, + val outputDir: File) extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) @@ -57,9 +58,7 @@ class SparkScala210Interpreter(override val conf: SparkConf, if (InterpreterContext.get() != null) { interpreterOutput.setInterpreterOutput(InterpreterContext.get().out) } - val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) - this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile - outputDir.deleteOnExit() + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) // Only Spark1 requires to create http server, Spark2 removes HttpServer class. diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index 3c3943b..6f531d2 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -40,7 +40,8 @@ class SparkScala211Interpreter(override val conf: SparkConf, override val depFiles: java.util.List[String], override val properties: Properties, override val interpreterGroup: InterpreterGroup, - override val sparkInterpreterClassLoader: URLClassLoader) + override val sparkInterpreterClassLoader: URLClassLoader, + val outputDir: File) extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { import SparkScala211Interpreter._ @@ -56,12 +57,10 @@ class SparkScala211Interpreter(override val conf: SparkConf, if (sparkMaster == "yarn-client") { System.setProperty("SPARK_YARN_MODE", "true") } - // Only Spark1 requires to create http server, Spark2 removes HttpServer class. - val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) - this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) - outputDir.deleteOnExit() conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) + // Only Spark1 requires to create http server, Spark2 removes HttpServer class. startHttpServer(outputDir).foreach { case (server, uri) => sparkHttpServer = server conf.set("spark.repl.class.uri", uri) diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index e467336..e9c127d 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -40,7 +40,8 @@ class SparkScala212Interpreter(override val conf: SparkConf, override val depFiles: java.util.List[String], override val properties: Properties, override val interpreterGroup: InterpreterGroup, - override val sparkInterpreterClassLoader: URLClassLoader) + override val sparkInterpreterClassLoader: URLClassLoader, + val outputDir: File) extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) @@ -54,11 +55,8 @@ class SparkScala212Interpreter(override val conf: SparkConf, if (sparkMaster == "yarn-client") { System.setProperty("SPARK_YARN_MODE", "true") } - // Only Spark1 requires to create http server, Spark2 removes HttpServer class. - val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) - this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) - outputDir.deleteOnExit() conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) val settings = new Settings() diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index eb99040..df3ca6d 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -63,8 +63,6 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected var sparkSession: Object = _ - protected var outputDir: File = _ - protected var userJars: Seq[String] = _ protected var sparkHttpServer: Object = _ diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index 890d5a3..5fefa6b 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterNotFoundException; +import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; @@ -47,6 +48,7 @@ import java.io.IOException; import java.util.EnumSet; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; @@ -253,14 +255,57 @@ public abstract class SparkIntegrationTest { @Test public void testSparkSubmit() throws InterpreterException { - InterpreterSetting sparkSubmitInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark-submit"); - sparkSubmitInterpreterSetting.setProperty("SPARK_HOME", sparkHome); - // test SparkSubmitInterpreter - InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); - Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test")); - InterpreterResult interpreterResult = sparkSubmitInterpreter.interpret("--class org.apache.spark.examples.SparkPi " + sparkHome + "/examples/jars/spark-examples*.jar ", context); + try { + InterpreterSetting sparkSubmitInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark-submit"); + sparkSubmitInterpreterSetting.setProperty("SPARK_HOME", sparkHome); + // test SparkSubmitInterpreter + InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); + Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test")); + InterpreterResult interpreterResult = sparkSubmitInterpreter.interpret("--class org.apache.spark.examples.SparkPi " + sparkHome + "/examples/jars/spark-examples*.jar ", context); + + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); + } finally { + interpreterSettingManager.close(); + } + } - assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); + @Test + public void testScopedMode() throws InterpreterException { + InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); + try { + sparkInterpreterSetting.setProperty("spark.master", "local[*]"); + sparkInterpreterSetting.setProperty("spark.submit.deployMode", "client"); + sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome); + sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath()); + sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false"); + sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false"); + sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false"); + sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); + sparkInterpreterSetting.getOption().setPerNote(InterpreterOption.SCOPED); + + + Interpreter sparkInterpreter1 = interpreterFactory.getInterpreter("spark.spark", new ExecutionContext("user1", "note1", "test")); + + InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); + InterpreterResult interpreterResult = sparkInterpreter1.interpret("sc.range(1,10).map(e=>e+1).sum()", context); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("54")); + + Interpreter sparkInterpreter2 = interpreterFactory.getInterpreter("spark.spark", new ExecutionContext("user1", "note2", "test")); + assertNotEquals(sparkInterpreter1, sparkInterpreter2); + + context = new InterpreterContext.Builder().setNoteId("note2").setParagraphId("paragraph_1").build(); + interpreterResult = sparkInterpreter2.interpret("sc.range(1,10).map(e=>e+1).sum()", context); + assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("54")); + } finally { + interpreterSettingManager.close(); + + if (sparkInterpreterSetting != null) { + // reset InterpreterOption so that it won't affect other tests. + sparkInterpreterSetting.getOption().setPerNote(InterpreterOption.SHARED); + } + } } private boolean isSpark2() {