This is an automated email from the ASF dual-hosted git repository. liuxun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 8f97164 [ZEPPELIN-4236] Cluster mode support interpreter running in docker 8f97164 is described below commit 8f971641769b3cafcb9b8d08b158a7f8ba18451a Author: Xun Liu <liu...@apache.org> AuthorDate: Mon Jul 15 14:07:23 2019 +0800 [ZEPPELIN-4236] Cluster mode support interpreter running in docker ### What is this PR for? Now that zeppelin is set up in cluster mode, the interpreter will not work in docker mode. We need to mix these two modes together. This will allow more users to use the interpreter container. ### What type of PR is it? [Improvement] ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4236 ### How should this be tested? * [CI pass](https://travis-ci.org/liuxunorg/zeppelin/builds/557797483) ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Xun Liu <liu...@apache.org> Closes #3405 from liuxunorg/ZEPPELIN-4236 and squashes the following commits: 0a9b34ba6 [Xun Liu] Add `Docker Mode` and `Local Mode` test case. b26195d9d [Xun Liu] [ZEPPELIN-4236] Cluster mode support interpreter running in docker --- .../zeppelin/conf/ZeppelinConfiguration.java | 5 +++ .../interpreter/launcher/InterpreterLauncher.java | 2 +- zeppelin-plugins/launcher/cluster/pom.xml | 5 +++ .../launcher/ClusterInterpreterCheckThread.java | 22 ++++++++---- .../launcher/ClusterInterpreterLauncher.java | 42 +++++++++++++++++----- .../launcher/ClusterInterpreterProcess.java | 3 -- .../launcher/ClusterInterpreterLauncherTest.java | 38 ++++++++++++++++++++ 7 files changed, 99 insertions(+), 18 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 2818313..28fefcd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -722,6 +722,11 @@ public class ZeppelinConfiguration extends XMLConfiguration { } } + @VisibleForTesting + public void setRunMode(RUN_MODE runMode) { + properties.put(ConfVars.ZEPPELIN_RUN_MODE.getVarName(), runMode.name()); + } + public boolean getK8sPortForward() { return getBoolean(ConfVars.ZEPPELIN_K8S_PORTFORWARD); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java index e505595..fe105c6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -44,7 +44,7 @@ public abstract class InterpreterLauncher { protected int getConnectTimeout() { int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); - if (properties.containsKey( + if (properties != null && properties.containsKey( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName())) { connectTimeout = Integer.parseInt(properties.getProperty( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName())); diff --git a/zeppelin-plugins/launcher/cluster/pom.xml b/zeppelin-plugins/launcher/cluster/pom.xml index 80ca0b3..bf24546 100644 --- a/zeppelin-plugins/launcher/cluster/pom.xml +++ b/zeppelin-plugins/launcher/cluster/pom.xml @@ -46,6 +46,11 @@ <artifactId>launcher-standard</artifactId> <version>0.9.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>launcher-docker</artifactId> + <version>0.9.0-SNAPSHOT</version> + </dependency> </dependencies> <build> diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java index fb7e41f..a389135 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java @@ -33,10 +33,16 @@ public class ClusterInterpreterCheckThread extends Thread { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterCheckThread.class); - private ClusterInterpreterProcess intpProcess; + private InterpreterClient intpProcess; + private String intpGroupId; + private int connectTimeout; - ClusterInterpreterCheckThread(ClusterInterpreterProcess intpProcess) { + ClusterInterpreterCheckThread(InterpreterClient intpProcess, + String intpGroupId, + int connectTimeout) { this.intpProcess = intpProcess; + this.intpGroupId = intpGroupId; + this.connectTimeout = connectTimeout; } @Override @@ -45,11 +51,8 @@ public class ClusterInterpreterCheckThread extends Thread { ClusterManagerServer clusterServer = ClusterManagerServer.getInstance(); - String intpGroupId = intpProcess.getInterpreterGroupId(); - HashMap<String, Object> intpMeta = clusterServer .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId); - int connectTimeout = intpProcess.getConnectTimeout(); int MAX_RETRY_GET_META = connectTimeout / ClusterInterpreterLauncher.CHECK_META_INTERVAL; int retryGetMeta = 0; @@ -71,7 +74,14 @@ public class ClusterInterpreterCheckThread extends Thread { int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT); LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort); - intpProcess.processStarted(intpPort, intpHost); + if (intpProcess instanceof DockerInterpreterProcess) { + ((DockerInterpreterProcess) intpProcess).processStarted(intpPort, intpHost); + } else if (intpProcess instanceof ClusterInterpreterProcess) { + ((ClusterInterpreterProcess) intpProcess).processStarted(intpPort, intpHost); + } else { + LOGGER.error("Unknown type !"); + } + break; } } diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java index 2b2ac61..14bbce2 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java @@ -27,6 +27,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterRunner; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; @@ -52,7 +53,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher implements ClusterEventListener { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncher.class); - public static final int CHECK_META_INTERVAL = 500; // ms + public static final int CHECK_META_INTERVAL = 2000; // ms private InterpreterLaunchContext context; private ClusterManagerServer clusterServer = ClusterManagerServer.getInstance(); @@ -175,8 +176,8 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG); InterpreterLaunchContext context = gson.fromJson( eventMsg, new TypeToken<InterpreterLaunchContext>() {}.getType()); - ClusterInterpreterProcess clusterInterpreterProcess = createInterpreterProcess(context); - clusterInterpreterProcess.start(context.getUserName()); + InterpreterClient clusterOrDockerIntpProcess = createInterpreterProcess(context); + clusterOrDockerIntpProcess.start(context.getUserName()); break; default: LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg); @@ -187,10 +188,9 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher } } - private ClusterInterpreterProcess createInterpreterProcess(InterpreterLaunchContext context) { - ClusterInterpreterProcess clusterInterpreterProcess = null; + private RemoteInterpreterProcess createClusterIntpProcess() { + ClusterInterpreterProcess clusterIntpProcess = null; try { - this.properties = context.getProperties(); InterpreterOption option = context.getOption(); InterpreterRunner runner = context.getRunner(); String intpSetGroupName = context.getInterpreterSettingGroup(); @@ -199,7 +199,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + context.getInterpreterSettingId(); - clusterInterpreterProcess = new ClusterInterpreterProcess( + clusterIntpProcess = new ClusterInterpreterProcess( runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), @@ -215,6 +215,32 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher LOGGER.error(e.getMessage(), e); } - return clusterInterpreterProcess; + return clusterIntpProcess; + } + + private InterpreterClient createInterpreterProcess(InterpreterLaunchContext context) + throws IOException { + this.context = context; + this.properties = context.getProperties(); + int connectTimeout = getConnectTimeout(); + + InterpreterClient remoteIntpProcess = null; + if (isRunningOnDocker(zConf)) { + DockerInterpreterLauncher dockerIntpLauncher = new DockerInterpreterLauncher(zConf, null); + dockerIntpLauncher.setProperties(context.getProperties()); + remoteIntpProcess = dockerIntpLauncher.launch(context); + } else { + remoteIntpProcess = createClusterIntpProcess(); + } + + ClusterInterpreterCheckThread intpCheckThread = new ClusterInterpreterCheckThread( + remoteIntpProcess, context.getInterpreterGroupId(), connectTimeout); + intpCheckThread.start(); + + return remoteIntpProcess; + } + + private boolean isRunningOnDocker(ZeppelinConfiguration zconf) { + return zconf.getRunMode() == ZeppelinConfiguration.RUN_MODE.DOCKER; } } diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java index 8f0fcc7..744e880 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java @@ -36,9 +36,6 @@ public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess { @Override public void start(String userName) throws IOException { - ClusterInterpreterCheckThread interpreterCheckThread = new ClusterInterpreterCheckThread(this); - interpreterCheckThread.start(); - super.start(userName); } diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java index 2b2b172..7d83f07 100644 --- a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java @@ -99,4 +99,42 @@ public class ClusterInterpreterLauncherTest extends ClusterMockTest { assertTrue(interpreterProcess.getEnv().size() >= 1); assertEquals(true, interpreterProcess.isUserImpersonated()); } + + @Test + public void testCreateIntpProcessDockerMode() throws IOException { + zconf.setRunMode(ZeppelinConfiguration.RUN_MODE.DOCKER); + + ClusterInterpreterLauncher launcher + = new ClusterInterpreterLauncher(zconf, null); + Properties properties = new Properties(); + properties.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "1000"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, + "user1", "intpGroupId3", "groupId3", + "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + + assertTrue(client instanceof DockerInterpreterProcess); + } + + @Test + public void testCreateIntpProcessLocalMode() throws IOException { + zconf.setRunMode(ZeppelinConfiguration.RUN_MODE.LOCAL); + + ClusterInterpreterLauncher launcher + = new ClusterInterpreterLauncher(zconf, null); + Properties properties = new Properties(); + properties.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "1000"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, + "user1", "intpGroupId4", "groupId4", + "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + + assertTrue(client instanceof ClusterInterpreterProcess); + } }