Repository: zeppelin
Updated Branches:
  refs/heads/master 9d40013a9 -> 0aea2416d


[HOTFIX][ZEPPELIN-2294]. Interpreter fail exception is not propagated to 
frontend

### What is this PR for?

This PR would address the issue of Interpreter fail exception is not propagated 
to frontend. This is due to the bug fix of restarting button issue.  Make 
referenceCount as number of sessions attached to this process. Open and close 
all the interpreters in one session together. I don't think there's scenario 
for close a single interpreter for now.

### What type of PR is it?
[Bug Fix | Improvement]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2294

### How should this be tested?
I don't have time to write test, just verify zombie process issue manually in 
the following scenario
* Per User + Isolated
* Per Note + Isolated
* Per User + Scoped
* Per Note + Scoped

### Screenshots (if appropriate)
Before
![2017-03-22_1538](https://cloud.githubusercontent.com/assets/164491/24198052/227d42e6-0f3f-11e7-9918-bf9827e44f92.png)

After
![2017-03-22_1523](https://cloud.githubusercontent.com/assets/164491/24198062/31043cc0-0f3f-11e7-8ab3-87938e3918ce.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2175 from zjffdu/ZEPPELIN-2294 and squashes the following commits:

a3f8aa0b [Jeff Zhang] [ZEPPELIN-2294]. Interpreter fail exception is not 
propagated to frontend


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/0aea2416
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/0aea2416
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/0aea2416

Branch: refs/heads/master
Commit: 0aea2416dc60fb6e2e181ee1c593d9d2c575bcb7
Parents: 9d40013
Author: Jeff Zhang <zjf...@apache.org>
Authored: Wed Mar 22 15:19:42 2017 +0800
Committer: Jongyoul Lee <jongy...@apache.org>
Committed: Thu Mar 23 22:56:24 2017 +0900

----------------------------------------------------------------------
 .../zeppelin/interpreter/InterpreterGroup.java  |  1 +
 .../interpreter/LazyOpenInterpreter.java        | 13 ++----
 .../interpreter/remote/RemoteInterpreter.java   | 48 ++++++++++++++++++--
 .../remote/RemoteInterpreterProcess.java        |  2 +
 .../remote/RemoteInterpreterTest.java           |  6 +--
 .../interpreter/InterpreterSetting.java         | 18 --------
 .../interpreter/InterpreterSettingManager.java  |  1 -
 7 files changed, 55 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 7367588..5cbab6b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -182,6 +182,7 @@ public class InterpreterGroup extends 
ConcurrentHashMap<String, List<Interpreter
 
   public void close(final Map<String, InterpreterGroup> interpreterGroupRef,
       final String processKey, final String sessionKey) {
+    LOGGER.info("Close interpreter group " + getId() + " for session: " + 
sessionKey);
     close(interpreterGroupRef, processKey, sessionKey, this.get(sessionKey));
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
index 0340632..ad85ded 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
@@ -75,11 +75,11 @@ public class LazyOpenInterpreter
 
   @Override
   public void close() {
-    // TODO(jl): Remove this trick!!
-    // intp.close() should be called to reduce referenceCount
-    if (isOpen() || intp instanceof RemoteInterpreter) {
-      intp.close();
-      opened = false;
+    synchronized (intp) {
+      if (opened == true) {
+        intp.close();
+        opened = false;
+      }
     }
   }
 
@@ -103,9 +103,6 @@ public class LazyOpenInterpreter
 
   @Override
   public FormType getFormType() {
-    // RemoteInterpreter's this method calls init() internally, and which 
cause to increase the
-    // number of referenceCount and it affects incorrectly
-    open();
     return intp.getFormType();
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index c751dcf..aae50ae 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -204,7 +204,7 @@ public class RemoteInterpreter extends Interpreter {
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
 
     final InterpreterGroup interpreterGroup = getInterpreterGroup();
-    interpreterProcess.reference(interpreterGroup, userName, 
isUserImpersonate);
+
     interpreterProcess.setMaxPoolSize(
         Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
     String groupId = interpreterGroup.getId();
@@ -260,6 +260,11 @@ public class RemoteInterpreter extends Interpreter {
       // other interpreters doesn't do anything because those LazyInterpreters 
aren't open.
       // But for now, we have to initialise all of interpreters for some 
reasons.
       // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+      if (!initialized) {
+        // reference per session
+        interpreterProcess.reference(interpreterGroup, userName, 
isUserImpersonate);
+      }
       for (Interpreter intp : new ArrayList<>(interpreters)) {
         Interpreter p = intp;
         while (p instanceof WrappedInterpreter) {
@@ -278,8 +283,43 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public void close() {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+    synchronized (interpreterGroup) {
+      // close all interpreters in this session
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). 
It, however,
+      // initializes all of interpreters with same sessionKey. But 
LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while 
running intp.close()
+      // In case of Spark, this method initializes all of interpreters and 
init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this 
interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters 
aren't open.
+      // But for now, we have to initialise all of interpreters for some 
reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      if (initialized) {
+        // dereference per session
+        getInterpreterProcess().dereference();
+      }
+      for (Interpreter intp : new ArrayList<>(interpreters)) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        try {
+          ((RemoteInterpreter) p).closeInterpreter();
+        } catch (InterpreterException e) {
+          logger.error("Failed to initialize interpreter: {}. Remove it from 
interpreterGroup",
+              p.getClassName());
+          interpreters.remove(p);
+        }
+      }
+    }
+  }
 
+  public void closeInterpreter() {
+    if (this.initialized == false) {
+      return;
+    }
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
     Client client = null;
     boolean broken = false;
     try {
@@ -296,7 +336,7 @@ public class RemoteInterpreter extends Interpreter {
       if (client != null) {
         interpreterProcess.releaseClient(client, broken);
       }
-      getInterpreterProcess().dereference();
+      this.initialized = false;
     }
   }
 
@@ -388,7 +428,7 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public FormType getFormType() {
-    init();
+    open();
 
     if (formType != null) {
       return formType;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index c53d907..1d48a1e 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class RemoteInterpreterProcess {
   private static final Logger logger = 
LoggerFactory.getLogger(RemoteInterpreterProcess.class);
+
+  // number of sessions that are attached to this process
   private final AtomicInteger referenceCount;
 
   private GenericObjectPool<Client> clientPool;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 51c18f7..2914bb4 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -142,7 +142,7 @@ public class RemoteInterpreterTest {
     intpA.open(); // initializa all interpreters in the same group
     assertTrue(process.isRunning());
     assertEquals(1, process.getNumIdleClient());
-    assertEquals(2, process.referenceCount());
+    assertEquals(1, process.referenceCount());
 
     intpA.interpret("1",
         new InterpreterContext(
@@ -159,10 +159,10 @@ public class RemoteInterpreterTest {
             new LinkedList<InterpreterContextRunner>(), null));
 
     intpB.open();
-    assertEquals(2, process.referenceCount());
+    assertEquals(1, process.referenceCount());
 
     intpA.close();
-    assertEquals(1, process.referenceCount());
+    assertEquals(0, process.referenceCount());
     intpB.close();
     assertEquals(0, process.referenceCount());
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 317efbd..2efba48 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -226,24 +226,6 @@ public class InterpreterSetting {
     }
   }
 
-  void closeAndRemoveInterpreterGroupByNoteId(String noteId) {
-    String processKey = getInterpreterProcessKey("", noteId);
-    List<InterpreterGroup> closeToGroupList = new LinkedList<>();
-    InterpreterGroup groupKey;
-    for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
-      if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
-        interpreterGroupWriteLock.lock();
-        groupKey = interpreterGroupRef.remove(intpKey);
-        interpreterGroupWriteLock.unlock();
-        closeToGroupList.add(groupKey);
-      }
-    }
-
-    for (InterpreterGroup groupToRemove : closeToGroupList) {
-      groupToRemove.close();
-    }
-  }
-
   void closeAndRemoveInterpreterGroup(String noteId, String user) {
     if (user.equals("anonymous")) {
       user = "";

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 98cfb08..32db89b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -926,7 +926,6 @@ public class InterpreterSettingManager {
   public void restart(String settingId, String noteId, String user) {
     InterpreterSetting intpSetting = interpreterSettings.get(settingId);
     Preconditions.checkNotNull(intpSetting);
-
     synchronized (interpreterSettings) {
       intpSetting = interpreterSettings.get(settingId);
       // Check if dependency in specified path is changed

Reply via email to