Repository: zeppelin Updated Branches: refs/heads/master 57601f819 -> 71bc75966
ZEPPELIN-3749. New Spark interpreter has to be restarted two times inorder to work fine for different users ### What is this PR for? This PR fix the issue of scoped mode for new spark interpreter. It will only close SparkContext when there's no other live sessions. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3749 ### How should this be tested? * Unit test is added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3166 from zjffdu/ZEPPELIN-3749 and squashes the following commits: 67ba55627 [Jeff Zhang] ZEPPELIN-3749. New Spark interpreter has to be restarted two times in order to work fine for different users Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/71bc7596 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/71bc7596 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/71bc7596 Branch: refs/heads/master Commit: 71bc75966a7e5a18ce35b1f767c903a4b5c02eb9 Parents: 57601f8 Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Aug 30 10:14:42 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Tue Sep 4 17:10:10 2018 +0800 ---------------------------------------------------------------------- spark/interpreter/figure/null-1.png | Bin 13599 -> 0 bytes spark/interpreter/figure/unnamed-chunk-1-1.png | Bin 407541 -> 0 bytes .../zeppelin/spark/NewSparkInterpreterTest.java | 47 ++++++++++++++++++- .../spark/SparkScala210Interpreter.scala | 3 -- .../spark/BaseSparkScalaInterpreter.scala | 33 ++++++++----- 5 files changed, 67 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/interpreter/figure/null-1.png ---------------------------------------------------------------------- diff --git a/spark/interpreter/figure/null-1.png b/spark/interpreter/figure/null-1.png deleted file mode 100644 index 8b1ce07..0000000 Binary files a/spark/interpreter/figure/null-1.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/interpreter/figure/unnamed-chunk-1-1.png ---------------------------------------------------------------------- diff --git a/spark/interpreter/figure/unnamed-chunk-1-1.png b/spark/interpreter/figure/unnamed-chunk-1-1.png deleted file mode 100644 index 6f03c95..0000000 Binary files a/spark/interpreter/figure/unnamed-chunk-1-1.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java index ea19866..82727a1 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java @@ -34,6 +34,7 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.junit.After; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -67,7 +68,12 @@ public class NewSparkInterpreterTest { // catch the interpreter output in onUpdate private InterpreterResultMessageOutput messageOutput; - private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class); + private RemoteInterpreterEventClient mockRemoteEventClient; + + @Before + public void setUp() { + mockRemoteEventClient = mock(RemoteInterpreterEventClient.class); + } @Test public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException { @@ -519,6 +525,45 @@ public class NewSparkInterpreterTest { verify(mockRemoteEventClient, never()).onParaInfosReceived(any(Map.class)); } + @Test + public void testScopedMode() throws InterpreterException { + Properties properties = new Properties(); + properties.setProperty("spark.master", "local"); + properties.setProperty("spark.app.name", "test"); + properties.setProperty("zeppelin.spark.maxResult", "100"); + properties.setProperty("zeppelin.spark.test", "true"); + properties.setProperty("zeppelin.spark.useNew", "true"); + + SparkInterpreter interpreter1 = new SparkInterpreter(properties); + SparkInterpreter interpreter2 = new SparkInterpreter(properties); + + InterpreterGroup interpreterGroup = new InterpreterGroup(); + interpreter1.setInterpreterGroup(interpreterGroup); + interpreter2.setInterpreterGroup(interpreterGroup); + + interpreterGroup.addInterpreterToSession(interpreter1, "session_1"); + interpreterGroup.addInterpreterToSession(interpreter2, "session_2"); + + InterpreterContext.set(getInterpreterContext()); + interpreter1.open(); + interpreter2.open(); + + InterpreterContext context = getInterpreterContext(); + + InterpreterResult result1 = interpreter1.interpret("sc.range(1, 10).sum", context); + assertEquals(InterpreterResult.Code.SUCCESS, result1.code()); + + InterpreterResult result2 = interpreter2.interpret("sc.range(1, 10).sum", context); + assertEquals(InterpreterResult.Code.SUCCESS, result2.code()); + + // interpreter2 continue to work after interpreter1 is closed + interpreter1.close(); + + result2 = interpreter2.interpret("sc.range(1, 10).sum", context); + assertEquals(InterpreterResult.Code.SUCCESS, result2.code()); + interpreter2.close(); + } + @After public void tearDown() throws InterpreterException { if (this.interpreter != null) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala ---------------------------------------------------------------------- diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala index 6fc8178..9d2ac83 100644 --- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala +++ b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala @@ -87,9 +87,6 @@ class SparkScala210Interpreter(override val conf: SparkConf, override def close(): Unit = { super.close() - if (sparkILoop != null) { - callMethod(sparkILoop, "org$apache$spark$repl$SparkILoop$$closeInterpreter") - } } def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala ---------------------------------------------------------------------- diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 7fe43c1..a73630a 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -19,6 +19,7 @@ package org.apache.zeppelin.spark import java.io.File +import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.sql.SQLContext import org.apache.spark.{JobProgressUtil, SparkConf, SparkContext} @@ -59,6 +60,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected val interpreterOutput: InterpreterOutputStream + protected def open(): Unit = { /* Required for scoped mode. * In scoped mode multiple scala compiler (repl) generates class in the same directory. @@ -77,6 +79,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, * */ System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0')) + + BaseSparkScalaInterpreter.sessionNum.incrementAndGet() } def interpret(code: String, context: InterpreterContext): InterpreterResult = { @@ -152,19 +156,20 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, bind(name, tpe, value, modifier.asScala.toList) protected def close(): Unit = { - if (sc != null) { - sc.stop() - } - if (sparkHttpServer != null) { - sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer) - } - sc = null - sqlContext = null - if (sparkSession != null) { - sparkSession.getClass.getMethod("stop").invoke(sparkSession) - sparkSession = null + if (BaseSparkScalaInterpreter.sessionNum.decrementAndGet() == 0) { + if (sc != null) { + sc.stop() + } + if (sparkHttpServer != null) { + sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer) + } + sc = null + sqlContext = null + if (sparkSession != null) { + sparkSession.getClass.getMethod("stop").invoke(sparkSession) + sparkSession = null + } } - } protected def createSparkContext(): Unit = { @@ -376,3 +381,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, depFiles.asScala.filter(!_.endsWith(".jar")) } } + +object BaseSparkScalaInterpreter { + val sessionNum = new AtomicInteger(0) +} \ No newline at end of file