This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 2ab8f9e [ZEPPELIN-4852]. Add name to RemoteInterpreterProcess 2ab8f9e is described below commit 2ab8f9ecc032e3e9fbd12ce6908af29aee989cbc Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Jun 4 23:26:46 2020 +0800 [ZEPPELIN-4852]. Add name to RemoteInterpreterProcess ### What is this PR for? This is a trivial PR which add `getInterpreterGroupId` to `InterpreterClient`, and use `interpreterGroupId` as the identifier of RemoteInterpreterProcess because `interpreterGroupId` is also unique identifier of InterpreterGroup ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4852 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3785 from zjffdu/ZEPPELIN-4852 and squashes the following commits: 01ec89373 [Jeff Zhang] address comment 4fc92cc60 [Jeff Zhang] address comment a482faa39 [Jeff Zhang] [ZEPPELIN-4852]. Add name to RemoteInterpreterProcess --- .../interpreter/launcher/InterpreterClient.java | 2 ++ .../launcher/ClusterInterpreterLauncher.java | 2 ++ .../launcher/DockerInterpreterProcess.java | 5 +++++ .../launcher/K8sRemoteInterpreterProcess.java | 5 +++++ .../launcher/StandardInterpreterLauncher.java | 1 + .../recovery/FileSystemRecoveryStorage.java | 2 +- .../remote/RemoteInterpreterManagedProcess.java | 8 ++++---- .../remote/RemoteInterpreterRunningProcess.java | 20 ++++++++++++++++---- 8 files changed, 36 insertions(+), 9 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java index bfd3e44..73c8ef0 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java @@ -26,6 +26,8 @@ import java.io.IOException; */ public interface InterpreterClient { + String getInterpreterGroupId(); + String getInterpreterSettingName(); void start(String userName) throws IOException; diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java index b406ec3..ff6d69a 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -78,6 +78,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, intpTserverHost, intpTserverPort); @@ -149,6 +150,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, intpTserverHost, intpTserverPort); diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java index 2d64898..23e4262 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java @@ -155,6 +155,11 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { } @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + + @Override public String getInterpreterSettingName() { return interpreterSettingName; } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index 864f660..120500b 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -95,6 +95,11 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + + @Override public String getInterpreterSettingName() { return interpreterSettingName; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index d78cb2c..ff60b39 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -58,6 +58,7 @@ public class StandardInterpreterLauncher extends InterpreterLauncher { if (option.isExistingProcess()) { return new RemoteInterpreterRunningProcess( context.getInterpreterSettingName(), + context.getInterpreterGroupId(), connectTimeout, option.getHost(), option.getPort()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java index bef2c8f..1b660ac 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -110,7 +110,7 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess( - interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); + interpreterSettingName, groupId, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); // interpreterSettingManager may be null when this class is used when it is used // stop-interpreter.sh clients.put(groupId, client); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 69d82b6..84cab14 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -141,7 +141,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { public void stop() { if (isRunning()) { - LOGGER.info("Kill interpreter process"); + LOGGER.info("Kill interpreter process for interpreter group: {}", getInterpreterGroupId()); try { callRemoteFunction(new RemoteFunction<Void>() { @Override @@ -157,10 +157,9 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { // Shutdown connection shutdown(); this.interpreterProcessLauncher.stop(); + this.interpreterProcessLauncher = null; + LOGGER.info("Remote process of interpreter group: {} is terminated", getInterpreterGroupId()); } - - interpreterProcessLauncher = null; - LOGGER.info("Remote process terminated"); } @Override @@ -196,6 +195,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { return interpreterSettingName; } + @Override public String getInterpreterGroupId() { return interpreterGroupId; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index c2efcf4..d78bfca 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -16,7 +16,6 @@ */ package org.apache.zeppelin.interpreter.remote; -import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,19 +24,23 @@ import org.slf4j.LoggerFactory; * This class connects to existing process */ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { - private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private final String host; private final int port; private final String interpreterSettingName; + private final String interpreterGroupId; public RemoteInterpreterRunningProcess( String interpreterSettingName, + String interpreterGroupId, int connectTimeout, String host, int port ) { super(connectTimeout); this.interpreterSettingName = interpreterSettingName; + this.interpreterGroupId = interpreterGroupId; this.host = host; this.port = port; } @@ -58,6 +61,11 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { } @Override + public String getInterpreterGroupId() { + return interpreterGroupId; + } + + @Override public void start(String userName) { // assume process is externally managed. nothing to do } @@ -68,7 +76,7 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that. if (System.getenv("ZEPPELIN_FORCE_STOP") != null) { if (isRunning()) { - logger.info("Kill interpreter process"); + LOGGER.info("Kill interpreter process of interpreter group: {}", interpreterGroupId); try { callRemoteFunction(new RemoteFunction<Void>() { @Override @@ -78,8 +86,12 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { } }); } catch (Exception e) { - logger.warn("ignore the exception when shutting down interpreter process.", e); + LOGGER.warn("ignore the exception when shutting down interpreter process.", e); } + + // Shutdown connection + shutdown(); + LOGGER.info("Remote process of interpreter group: {} is terminated.", getInterpreterGroupId()); } } }