This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b31cb6  [ZEPPELIN-4932]. Close binded interpreter when note is removed
5b31cb6 is described below

commit 5b31cb61f605659bffec797f1d9ef46a2bbc4474
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Jul 2 16:20:04 2020 +0800

    [ZEPPELIN-4932]. Close binded interpreter when note is removed
    
    ### What is this PR for?
    
    This PR would try to close binded interpreter when note is removed. So that 
we won't have interpreter process leakage.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4932
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3837 from zjffdu/ZEPPELIN-4932 and squashes the following commits:
    
    243932544 [Jeff Zhang] [ZEPPELIN-4932]. Close binded interpreter when note 
is removed
---
 .../interpreter/remote/RemoteClientFactory.java    |  7 ++++-
 zeppelin-server/conf/notebook-authorization.json   | 16 -----------
 .../src/test/resources/log4j.properties            |  4 +--
 .../interpreter/InterpreterSettingManager.java     | 14 +++++++++
 .../remote/RemoteInterpreterManagedProcess.java    |  1 -
 .../org/apache/zeppelin/notebook/NotebookTest.java | 33 ++++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  2 +-
 7 files changed, 56 insertions(+), 21 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
index 4a5ea02..2abac21 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
@@ -21,16 +21,21 @@ import org.apache.commons.pool2.BasePooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.TServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Factory class for creating thrift socket client.
  */
 public class RemoteClientFactory<T extends TServiceClient> extends 
BasePooledObjectFactory<T>{
 
-  private Set<T> clientSockets = new HashSet<>();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteClientFactory.class);
+
+  private Set<T> clientSockets = ConcurrentHashMap.newKeySet();
   private SupplierWithIO<T> supplier;
 
   public RemoteClientFactory(SupplierWithIO<T> supplier) {
diff --git a/zeppelin-server/conf/notebook-authorization.json 
b/zeppelin-server/conf/notebook-authorization.json
deleted file mode 100644
index 1e3767c..0000000
--- a/zeppelin-server/conf/notebook-authorization.json
+++ /dev/null
@@ -1,16 +0,0 @@
-{
-  "authInfo": {
-    "2F8GB7HN7": {
-      "readers": [],
-      "owners": [],
-      "writers": [],
-      "runners": []
-    },
-    "2F9W6X5GY": {
-      "readers": [],
-      "owners": [],
-      "writers": [],
-      "runners": []
-    }
-  }
-}
\ No newline at end of file
diff --git a/zeppelin-server/src/test/resources/log4j.properties 
b/zeppelin-server/src/test/resources/log4j.properties
index 292305d..efc4b3e 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -43,5 +43,5 @@ log4j.logger.DataNucleus.Datastore=ERROR
 log4j.logger.org.hibernate.type=ALL
 log4j.logger.org.apache.hadoop=WARN
 
-log4j.logger.org.apache.zeppelin.interpreter=INFO
-log4j.logger.org.apache.zeppelin.scheduler=INFO
+log4j.logger.org.apache.zeppelin.interpreter=WARN
+log4j.logger.org.apache.zeppelin.scheduler=WARN
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 4df049a..5fb9ad0 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -1068,6 +1068,20 @@ public class InterpreterSettingManager implements 
NoteEventListener, ClusterEven
 
   @Override
   public void onNoteRemove(Note note, AuthenticationInfo subject) throws 
IOException {
+    // stop all associated interpreters
+    for (Paragraph paragraph : note.getParagraphs()) {
+      try {
+        Interpreter interpreter = paragraph.getBindedInterpreter();
+        InterpreterSetting interpreterSetting =
+                ((ManagedInterpreterGroup) 
interpreter.getInterpreterGroup()).getInterpreterSetting();
+        restart(interpreterSetting.getId(), subject.getUser(), note.getId());
+      } catch (InterpreterNotFoundException e) {
+
+      } catch (InterpreterException e) {
+        LOGGER.warn("Fail to stop interpreter setting", e);
+      }
+    }
+
     // remove from all interpreter instance's angular object registry
     for (InterpreterSetting settings : interpreterSettings.values()) {
       InterpreterGroup interpreterGroup = settings.getInterpreterGroup(
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 79130a4..a4a5df3 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -22,7 +22,6 @@ import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.ExecuteException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zeppelin.interpreter.YarnAppMonitor;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.interpreter.util.ProcessLauncher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index ad99ca7..a6d647f 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -436,6 +436,39 @@ public class NotebookTest extends AbstractInterpreterTest 
implements ParagraphJo
   }
 
   @Test
+  public void testRemoveNote() throws IOException, InterruptedException {
+    try {
+      LOGGER.info("--------------- Test testRemoveNote ---------------");
+      // create a note and a paragraph
+      Note note = notebook.createNote("note1", anonymous);
+      int mock1ProcessNum = 
interpreterSettingManager.getByName("mock1").getAllInterpreterGroups().size();
+      Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      Map config = new HashMap<>();
+      p.setConfig(config);
+      p.setText("%mock1 sleep 100000");
+      p.execute(false);
+      // wait until it is running
+      while (!p.isRunning()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      assertEquals(mock1ProcessNum + 1, 
interpreterSettingManager.getByName("mock1").getAllInterpreterGroups().size());
+      LOGGER.info("--------------- Finish Test testRemoveNote 
---------------");
+      notebook.removeNote(note, anonymous);
+      // stop interpreter process is async, so we wait for 5 seconds here.
+      Thread.sleep(5 * 1000);
+      assertEquals(mock1ProcessNum, 
interpreterSettingManager.getByName("mock1").getAllInterpreterGroups().size());
+
+      LOGGER.info("--------------- Finish Test testRemoveNote 
---------------");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
   public void testInvalidInterpreter() throws IOException, 
InterruptedException {
     Note note = notebook.createNote("note1", anonymous);
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
diff --git a/zeppelin-zengine/src/test/resources/log4j.properties 
b/zeppelin-zengine/src/test/resources/log4j.properties
index 4550ccd..b888064 100644
--- a/zeppelin-zengine/src/test/resources/log4j.properties
+++ b/zeppelin-zengine/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target=System.out
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
 #log4j.appender.stdout.layout.ConversionPattern=
 #%5p [%t] (%F:%L) - %m%n
 #%-4r [%t] %-5p %c %x - %m%n

Reply via email to