This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ab8f9e  [ZEPPELIN-4852]. Add name to RemoteInterpreterProcess
2ab8f9e is described below

commit 2ab8f9ecc032e3e9fbd12ce6908af29aee989cbc
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Jun 4 23:26:46 2020 +0800

    [ZEPPELIN-4852]. Add name to RemoteInterpreterProcess
    
    ### What is this PR for?
    
    This is a trivial PR which add `getInterpreterGroupId` to 
`InterpreterClient`, and use `interpreterGroupId` as the identifier of 
RemoteInterpreterProcess because `interpreterGroupId` is also unique identifier 
of InterpreterGroup
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4852
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3785 from zjffdu/ZEPPELIN-4852 and squashes the following commits:
    
    01ec89373 [Jeff Zhang] address comment
    4fc92cc60 [Jeff Zhang] address comment
    a482faa39 [Jeff Zhang] [ZEPPELIN-4852]. Add name to RemoteInterpreterProcess
---
 .../interpreter/launcher/InterpreterClient.java      |  2 ++
 .../launcher/ClusterInterpreterLauncher.java         |  2 ++
 .../launcher/DockerInterpreterProcess.java           |  5 +++++
 .../launcher/K8sRemoteInterpreterProcess.java        |  5 +++++
 .../launcher/StandardInterpreterLauncher.java        |  1 +
 .../recovery/FileSystemRecoveryStorage.java          |  2 +-
 .../remote/RemoteInterpreterManagedProcess.java      |  8 ++++----
 .../remote/RemoteInterpreterRunningProcess.java      | 20 ++++++++++++++++----
 8 files changed, 36 insertions(+), 9 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
index bfd3e44..73c8ef0 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
@@ -26,6 +26,8 @@ import java.io.IOException;
  */
 public interface InterpreterClient {
 
+  String getInterpreterGroupId();
+
   String getInterpreterSettingName();
 
   void start(String userName) throws IOException;
diff --git 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index b406ec3..ff6d69a 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -78,6 +78,7 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
 
             return new RemoteInterpreterRunningProcess(
                 context.getInterpreterSettingName(),
+                context.getInterpreterGroupId(),
                 connectTimeout,
                 intpTserverHost,
                 intpTserverPort);
@@ -149,6 +150,7 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
 
             return new RemoteInterpreterRunningProcess(
                 context.getInterpreterSettingName(),
+                context.getInterpreterGroupId(),
                 connectTimeout,
                 intpTserverHost,
                 intpTserverPort);
diff --git 
a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
 
b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
index 2d64898..23e4262 100644
--- 
a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
+++ 
b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
@@ -155,6 +155,11 @@ public class DockerInterpreterProcess extends 
RemoteInterpreterProcess {
   }
 
   @Override
+  public String getInterpreterGroupId() {
+    return interpreterGroupId;
+  }
+
+  @Override
   public String getInterpreterSettingName() {
     return interpreterSettingName;
   }
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 864f660..120500b 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -95,6 +95,11 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
   }
 
   @Override
+  public String getInterpreterGroupId() {
+    return interpreterGroupId;
+  }
+
+  @Override
   public String getInterpreterSettingName() {
     return interpreterSettingName;
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index d78cb2c..ff60b39 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -58,6 +58,7 @@ public class StandardInterpreterLauncher extends 
InterpreterLauncher {
     if (option.isExistingProcess()) {
       return new RemoteInterpreterRunningProcess(
           context.getInterpreterSettingName(),
+          context.getInterpreterGroupId(),
           connectTimeout,
           option.getHost(),
           option.getPort());
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
index bef2c8f..1b660ac 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
@@ -110,7 +110,7 @@ public class FileSystemRecoveryStorage extends 
RecoveryStorage {
           int connectTimeout =
               
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
           RemoteInterpreterRunningProcess client = new 
RemoteInterpreterRunningProcess(
-              interpreterSettingName, connectTimeout, hostPort[0], 
Integer.parseInt(hostPort[1]));
+              interpreterSettingName, groupId, connectTimeout, hostPort[0], 
Integer.parseInt(hostPort[1]));
           // interpreterSettingManager may be null when this class is used 
when it is used
           // stop-interpreter.sh
           clients.put(groupId, client);
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 69d82b6..84cab14 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -141,7 +141,7 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
 
   public void stop() {
     if (isRunning()) {
-      LOGGER.info("Kill interpreter process");
+      LOGGER.info("Kill interpreter process for interpreter group: {}", 
getInterpreterGroupId());
       try {
         callRemoteFunction(new RemoteFunction<Void>() {
           @Override
@@ -157,10 +157,9 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
       // Shutdown connection
       shutdown();
       this.interpreterProcessLauncher.stop();
+      this.interpreterProcessLauncher = null;
+      LOGGER.info("Remote process of interpreter group: {} is terminated", 
getInterpreterGroupId());
     }
-
-    interpreterProcessLauncher = null;
-    LOGGER.info("Remote process terminated");
   }
 
   @Override
@@ -196,6 +195,7 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
     return interpreterSettingName;
   }
 
+  @Override
   public String getInterpreterGroupId() {
     return interpreterGroupId;
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index c2efcf4..d78bfca 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -16,7 +16,6 @@
  */
 package org.apache.zeppelin.interpreter.remote;
 
-import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,19 +24,23 @@ import org.slf4j.LoggerFactory;
  * This class connects to existing process
  */
 public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
-  private final Logger logger = 
LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
+
   private final String host;
   private final int port;
   private final String interpreterSettingName;
+  private final String interpreterGroupId;
 
   public RemoteInterpreterRunningProcess(
       String interpreterSettingName,
+      String interpreterGroupId,
       int connectTimeout,
       String host,
       int port
   ) {
     super(connectTimeout);
     this.interpreterSettingName = interpreterSettingName;
+    this.interpreterGroupId = interpreterGroupId;
     this.host = host;
     this.port = port;
   }
@@ -58,6 +61,11 @@ public class RemoteInterpreterRunningProcess extends 
RemoteInterpreterProcess {
   }
 
   @Override
+  public String getInterpreterGroupId() {
+    return interpreterGroupId;
+  }
+
+  @Override
   public void start(String userName) {
     // assume process is externally managed. nothing to do
   }
@@ -68,7 +76,7 @@ public class RemoteInterpreterRunningProcess extends 
RemoteInterpreterProcess {
     // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that.
     if (System.getenv("ZEPPELIN_FORCE_STOP") != null) {
       if (isRunning()) {
-        logger.info("Kill interpreter process");
+        LOGGER.info("Kill interpreter process of interpreter group: {}", 
interpreterGroupId);
         try {
           callRemoteFunction(new RemoteFunction<Void>() {
             @Override
@@ -78,8 +86,12 @@ public class RemoteInterpreterRunningProcess extends 
RemoteInterpreterProcess {
             }
           });
         } catch (Exception e) {
-          logger.warn("ignore the exception when shutting down interpreter 
process.", e);
+          LOGGER.warn("ignore the exception when shutting down interpreter 
process.", e);
         }
+
+        // Shutdown connection
+        shutdown();
+        LOGGER.info("Remote process of interpreter group: {} is terminated.", 
getInterpreterGroupId());
       }
     }
   }

Reply via email to