Repository: zeppelin
Updated Branches:
  refs/heads/master 57601f819 -> 71bc75966


ZEPPELIN-3749. New Spark interpreter has to be restarted two times inorder to 
work fine for different users

### What is this PR for?
This PR fix the issue of scoped mode for new spark interpreter. It will only 
close SparkContext when there's no other live sessions.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3749

### How should this be tested?
* Unit test is added

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #3166 from zjffdu/ZEPPELIN-3749 and squashes the following commits:

67ba55627 [Jeff Zhang] ZEPPELIN-3749. New Spark interpreter has to be restarted 
two times in order to work fine for different users


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/71bc7596
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/71bc7596
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/71bc7596

Branch: refs/heads/master
Commit: 71bc75966a7e5a18ce35b1f767c903a4b5c02eb9
Parents: 57601f8
Author: Jeff Zhang <zjf...@apache.org>
Authored: Thu Aug 30 10:14:42 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Tue Sep 4 17:10:10 2018 +0800

----------------------------------------------------------------------
 spark/interpreter/figure/null-1.png             | Bin 13599 -> 0 bytes
 spark/interpreter/figure/unnamed-chunk-1-1.png  | Bin 407541 -> 0 bytes
 .../zeppelin/spark/NewSparkInterpreterTest.java |  47 ++++++++++++++++++-
 .../spark/SparkScala210Interpreter.scala        |   3 --
 .../spark/BaseSparkScalaInterpreter.scala       |  33 ++++++++-----
 5 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/interpreter/figure/null-1.png
----------------------------------------------------------------------
diff --git a/spark/interpreter/figure/null-1.png 
b/spark/interpreter/figure/null-1.png
deleted file mode 100644
index 8b1ce07..0000000
Binary files a/spark/interpreter/figure/null-1.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/interpreter/figure/unnamed-chunk-1-1.png
----------------------------------------------------------------------
diff --git a/spark/interpreter/figure/unnamed-chunk-1-1.png 
b/spark/interpreter/figure/unnamed-chunk-1-1.png
deleted file mode 100644
index 6f03c95..0000000
Binary files a/spark/interpreter/figure/unnamed-chunk-1-1.png and /dev/null 
differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index ea19866..82727a1 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -34,6 +34,7 @@ import 
org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -67,7 +68,12 @@ public class NewSparkInterpreterTest {
   // catch the interpreter output in onUpdate
   private InterpreterResultMessageOutput messageOutput;
 
-  private RemoteInterpreterEventClient mockRemoteEventClient = 
mock(RemoteInterpreterEventClient.class);
+  private RemoteInterpreterEventClient mockRemoteEventClient;
+
+  @Before
+  public void setUp() {
+    mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
+  }
 
   @Test
   public void testSparkInterpreter() throws IOException, InterruptedException, 
InterpreterException {
@@ -519,6 +525,45 @@ public class NewSparkInterpreterTest {
     verify(mockRemoteEventClient, never()).onParaInfosReceived(any(Map.class));
   }
 
+  @Test
+  public void testScopedMode() throws InterpreterException {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("zeppelin.spark.useNew", "true");
+
+    SparkInterpreter interpreter1 = new SparkInterpreter(properties);
+    SparkInterpreter interpreter2 = new SparkInterpreter(properties);
+
+    InterpreterGroup interpreterGroup = new InterpreterGroup();
+    interpreter1.setInterpreterGroup(interpreterGroup);
+    interpreter2.setInterpreterGroup(interpreterGroup);
+
+    interpreterGroup.addInterpreterToSession(interpreter1, "session_1");
+    interpreterGroup.addInterpreterToSession(interpreter2, "session_2");
+
+    InterpreterContext.set(getInterpreterContext());
+    interpreter1.open();
+    interpreter2.open();
+
+    InterpreterContext context = getInterpreterContext();
+
+    InterpreterResult result1 = interpreter1.interpret("sc.range(1, 10).sum", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result1.code());
+
+    InterpreterResult result2 = interpreter2.interpret("sc.range(1, 10).sum", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
+
+    // interpreter2 continue to work after interpreter1 is closed
+    interpreter1.close();
+
+    result2 = interpreter2.interpret("sc.range(1, 10).sum", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
+    interpreter2.close();
+  }
+
   @After
   public void tearDown() throws InterpreterException {
     if (this.interpreter != null) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
----------------------------------------------------------------------
diff --git 
a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
 
b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
index 6fc8178..9d2ac83 100644
--- 
a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
+++ 
b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
@@ -87,9 +87,6 @@ class SparkScala210Interpreter(override val conf: SparkConf,
 
   override def close(): Unit = {
     super.close()
-    if (sparkILoop != null) {
-      callMethod(sparkILoop, 
"org$apache$spark$repl$SparkILoop$$closeInterpreter")
-    }
   }
 
   def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/71bc7596/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 7fe43c1..a73630a 100644
--- 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -19,6 +19,7 @@ package org.apache.zeppelin.spark
 
 
 import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{JobProgressUtil, SparkConf, SparkContext}
@@ -59,6 +60,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
 
   protected val interpreterOutput: InterpreterOutputStream
 
+
   protected def open(): Unit = {
     /* Required for scoped mode.
      * In scoped mode multiple scala compiler (repl) generates class in the 
same directory.
@@ -77,6 +79,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
      *
      */
     System.setProperty("scala.repl.name.line", ("$line" + 
this.hashCode).replace('-', '0'))
+
+    BaseSparkScalaInterpreter.sessionNum.incrementAndGet()
   }
 
   def interpret(code: String, context: InterpreterContext): InterpreterResult 
= {
@@ -152,19 +156,20 @@ abstract class BaseSparkScalaInterpreter(val conf: 
SparkConf,
     bind(name, tpe, value, modifier.asScala.toList)
 
   protected def close(): Unit = {
-    if (sc != null) {
-      sc.stop()
-    }
-    if (sparkHttpServer != null) {
-      sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer)
-    }
-    sc = null
-    sqlContext = null
-    if (sparkSession != null) {
-      sparkSession.getClass.getMethod("stop").invoke(sparkSession)
-      sparkSession = null
+    if (BaseSparkScalaInterpreter.sessionNum.decrementAndGet() == 0) {
+      if (sc != null) {
+        sc.stop()
+      }
+      if (sparkHttpServer != null) {
+        sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer)
+      }
+      sc = null
+      sqlContext = null
+      if (sparkSession != null) {
+        sparkSession.getClass.getMethod("stop").invoke(sparkSession)
+        sparkSession = null
+      }
     }
-
   }
 
   protected def createSparkContext(): Unit = {
@@ -376,3 +381,7 @@ abstract class BaseSparkScalaInterpreter(val conf: 
SparkConf,
     depFiles.asScala.filter(!_.endsWith(".jar"))
   }
 }
+
+object BaseSparkScalaInterpreter {
+  val sessionNum = new AtomicInteger(0)
+}
\ No newline at end of file

Reply via email to