This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new e7e45a5 [ZEPPELIN-4480]. Move the ipython code into a general jupyter kernel bridge e7e45a5 is described below commit e7e45a5e1396f47ff4fafeaee2607efb02846025 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Dec 12 17:14:16 2019 +0800 [ZEPPELIN-4480]. Move the ipython code into a general jupyter kernel bridge ### What is this PR for? This PR move the ipython code into module zeppelin-jupyter-adapter. And ipython will just depends on this module. zeppelin-jupyter-adapter could be used for connecting any jupyter kernel. ### What type of PR is it? [ Feature | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4480 ### How should this be tested? * Ci pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3551 from zjffdu/ZEPPELIN-4480 and squashes the following commits: 78453473e [Jeff Zhang] [ZEPPELIN-4480]. Move the ipython code into a general jupyter kernel bridge --- .../apache/zeppelin/flink/IPyFlinkInterpreter.java | 4 +- pom.xml | 3 +- python/pom.xml | 25 +- .../apache/zeppelin/python/IPythonInterpreter.java | 376 ++++----------------- .../apache/zeppelin/python/PythonInterpreter.java | 2 +- .../zeppelin_ipython.py} | 0 .../zeppelin/python/IPythonInterpreterTest.java | 4 +- .../apache/zeppelin/spark/IPySparkInterpreter.java | 6 +- .../integration/InterpreterModeActionsIT.java | 2 +- {python => zeppelin-jupyter-adapter}/pom.xml | 86 ++--- .../zeppelin/interpreter/JupyterKernelClient.java | 58 ++-- .../interpreter/JupyterKernelInterpreter.java | 338 ++++++++++++++++++ .../src/main/proto/kernel.proto | 108 ++++++ .../src/main/resources/grpc/generate_rpc.sh | 2 +- .../main/resources/grpc/jupyter/kernel_client.py | 10 +- .../src/main/resources/grpc/jupyter/kernel_pb2.py | 181 +++++----- .../main/resources/grpc/jupyter/kernel_pb2_grpc.py | 71 ++-- .../main/resources/grpc/jupyter/kernel_server.py | 67 ++-- 18 files changed, 752 insertions(+), 591 deletions(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java index dee6328..5b9f280 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java @@ -58,8 +58,8 @@ public class IPyFlinkInterpreter extends IPythonInterpreter { } @Override - protected Map<String, String> setupIPythonEnv() throws IOException { - Map<String, String> envs = super.setupIPythonEnv(); + protected Map<String, String> setupKernelEnv() throws IOException { + Map<String, String> envs = super.setupKernelEnv(); String pythonPath = envs.getOrDefault("PYTHONPATH", ""); String pyflinkPythonPath = PyFlinkInterpreter.getPyFlinkPythonPath(properties); envs.put("PYTHONPATH", pythonPath + ":" + pyflinkPythonPath); diff --git a/pom.xml b/pom.xml index bac85ab..130b01a 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ <module>zeppelin-zengine</module> <module>zeppelin-display</module> <module>rlang</module> + <module>zeppelin-jupyter-adapter</module> <module>kotlin</module> <module>groovy</module> <module>spark</module> @@ -1086,7 +1087,7 @@ <exclude>**/R/rzeppelin/DESCRIPTION</exclude> <exclude>**/R/rzeppelin/NAMESPACE</exclude> - <exclude>python/src/main/resources/grpc/**/*</exclude> + <exclude>zeppelin-jupyter-adapter/src/main/resources/grpc/**/*</exclude> </excludes> </configuration> diff --git a/python/pom.xml b/python/pom.xml index 62a2647..a167069 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -42,6 +42,12 @@ <dependencies> <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-jupyter-adapter</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> <version>1.3</version> @@ -111,25 +117,6 @@ <plugins> <plugin> - <groupId>org.xolstice.maven.plugins</groupId> - <artifactId>protobuf-maven-plugin</artifactId> - <version>0.5.0</version> - <configuration> - <protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}</protocArtifact> - <pluginId>grpc-java</pluginId> - <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}</pluginArtifact> - </configuration> - <executions> - <execution> - <goals> - <goal>compile</goal> - <goal>compile-custom</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> <artifactId>maven-antrun-plugin</artifactId> <version>1.7</version> <executions> diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java index 674d096..d6dcec0 100644 --- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java @@ -17,44 +17,23 @@ package org.apache.zeppelin.python; -import com.google.common.annotations.VisibleForTesting; -import io.grpc.ManagedChannelBuilder; -import org.apache.commons.exec.CommandLine; -import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.zeppelin.interpreter.BaseZeppelinContext; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.JupyterKernelInterpreter; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; -import org.apache.zeppelin.python.proto.CancelRequest; -import org.apache.zeppelin.python.proto.CompletionRequest; -import org.apache.zeppelin.python.proto.CompletionResponse; -import org.apache.zeppelin.python.proto.ExecuteRequest; -import org.apache.zeppelin.python.proto.ExecuteResponse; -import org.apache.zeppelin.python.proto.ExecuteStatus; -import org.apache.zeppelin.python.proto.IPythonStatus; -import org.apache.zeppelin.python.proto.StatusRequest; -import org.apache.zeppelin.python.proto.StatusResponse; -import org.apache.zeppelin.python.proto.StopRequest; -import org.apache.zeppelin.interpreter.util.ProcessLauncher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import py4j.GatewayServer; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.URL; -import java.nio.file.Files; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -62,43 +41,49 @@ import java.util.Properties; /** * IPython Interpreter for Zeppelin */ -public class IPythonInterpreter extends Interpreter { +public class IPythonInterpreter extends JupyterKernelInterpreter { private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class); - private IPythonProcessLauncher iPythonProcessLauncher; - private IPythonClient ipythonClient; + // GatewayServer in jvm side to communicate with python kernel process. private GatewayServer gatewayServer; - - protected BaseZeppelinContext zeppelinContext; - private String pythonExecutable; - private int ipythonLaunchTimeout; + // allow to set PYTHONPATH private String additionalPythonPath; private String additionalPythonInitFile; private boolean useBuiltinPy4j = true; private boolean usePy4JAuth = true; - private String secret; - - private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER); + private String py4jGatewaySecret; public IPythonInterpreter(Properties properties) { super(properties); } + @Override + public String getKernelName() { + return "python"; + } + + @Override + public List<String> getRequiredPackages() { + List<String> requiredPackages = super.getRequiredPackages(); + requiredPackages.add("ipython"); + requiredPackages.add("ipykernel"); + return requiredPackages; + } + /** * Sub class can customize the interpreter by adding more python packages under PYTHONPATH. - * e.g. PySparkInterpreter + * e.g. IPySparkInterpreter * * @param additionalPythonPath */ public void setAdditionalPythonPath(String additionalPythonPath) { - LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath); this.additionalPythonPath = additionalPythonPath; } /** * Sub class can customize the interpreter by running additional python init code. - * e.g. PySparkInterpreter + * e.g. IPySparkInterpreter * * @param additionalPythonInitFile */ @@ -106,10 +91,11 @@ public class IPythonInterpreter extends Interpreter { this.additionalPythonInitFile = additionalPythonInitFile; } - public void setAddBulitinPy4j(boolean add) { - this.useBuiltinPy4j = add; + public void setUseBuiltinPy4j(boolean useBuiltinPy4j) { + this.useBuiltinPy4j = useBuiltinPy4j; } + @Override public BaseZeppelinContext buildZeppelinContext() { return new PythonZeppelinContext( getInterpreterGroup().getInterpreterHookRegistry(), @@ -118,119 +104,48 @@ public class IPythonInterpreter extends Interpreter { @Override public void open() throws InterpreterException { + super.open(); try { - if (ipythonClient != null) { - // IPythonInterpreter might already been opened by PythonInterpreter - return; - } - pythonExecutable = getProperty("zeppelin.python", "python"); - LOGGER.info("Python Exec: " + pythonExecutable); - String checkPrerequisiteResult = checkIPythonPrerequisite(pythonExecutable); - if (!StringUtils.isEmpty(checkPrerequisiteResult)) { - throw new InterpreterException("IPython prerequisite is not meet: " + - checkPrerequisiteResult); - } - ipythonLaunchTimeout = Integer.parseInt( - getProperty("zeppelin.ipython.launch.timeout", "30000")); - this.zeppelinContext = buildZeppelinContext(); - int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - int message_size = Integer.parseInt(getProperty("zeppelin.ipython.grpc.message_size", - 32 * 1024 * 1024 + "")); - ipythonClient = new IPythonClient(ManagedChannelBuilder.forAddress("127.0.0.1", ipythonPort) - .usePlaintext(true).maxInboundMessageSize(message_size)); - this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true")); - this.secret = PythonUtils.createSecret(256); - launchIPythonKernel(ipythonPort); - setupJVMGateway(jvmGatewayPort); - } catch (Exception e) { - throw new InterpreterException("Fail to open IPythonInterpreter\n" + - ExceptionUtils.getStackTrace(e), e); - } - } - - /** - * non-empty return value mean the errors when checking ipython prerequisite. - * empty value mean IPython prerequisite is meet. - * - * @param pythonExec - * @return - */ - public String checkIPythonPrerequisite(String pythonExec) { - ProcessBuilder processBuilder = new ProcessBuilder(pythonExec, "-m", "pip", "freeze"); - File stderrFile = null; - File stdoutFile = null; - try { - stderrFile = File.createTempFile("zeppelin", ".txt"); - processBuilder.redirectError(stderrFile); - stdoutFile = File.createTempFile("zeppelin", ".txt"); - processBuilder.redirectOutput(stdoutFile); - - Process proc = processBuilder.start(); - int ret = proc.waitFor(); - if (ret != 0) { - try (FileInputStream in = new FileInputStream(stderrFile)) { - return "Fail to run pip freeze.\n" + IOUtils.toString(in); - } - } - try (FileInputStream in = new FileInputStream(stdoutFile)) { - String freezeOutput = IOUtils.toString(in); - if (!freezeOutput.contains("jupyter-client=")) { - return "jupyter-client is not installed."; - } - if (!freezeOutput.contains("ipykernel=")) { - return "ipykernel is not installed"; - } - if (!freezeOutput.contains("ipython=")) { - return "ipython is not installed"; - } - if (!freezeOutput.contains("grpcio=")) { - return "grpcio is not installed"; - } - if (!freezeOutput.contains("protobuf=")) { - return "protobuf is not installed"; - } - LOGGER.info("IPython prerequisite is met"); - } + String gatewayHost = PythonUtils.getLocalIP(properties); + int gatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + setupJVMGateway(gatewayHost, gatewayPort); + initPythonInterpreter(gatewayHost, gatewayPort); } catch (Exception e) { - LOGGER.warn("Fail to checkIPythonPrerequisite", e); - return "Fail to checkIPythonPrerequisite: " + ExceptionUtils.getStackTrace(e); - } finally { - FileUtils.deleteQuietly(stderrFile); - FileUtils.deleteQuietly(stdoutFile); + LOGGER.error("Fail to open IPythonInterpreter", e); + throw new InterpreterException(e); } - return ""; } - private void setupJVMGateway(int jvmGatewayPort) throws IOException { - String serverAddress = PythonUtils.getLocalIP(properties); - this.gatewayServer = - PythonUtils.createGatewayServer(this, serverAddress, jvmGatewayPort, secret, usePy4JAuth); + private void setupJVMGateway(String gatewayHost, int gatewayPort) throws IOException { + this.gatewayServer = PythonUtils.createGatewayServer(this, gatewayHost, + gatewayPort, py4jGatewaySecret, usePy4JAuth); gatewayServer.start(); + } + private void initPythonInterpreter(String gatewayHost, int gatewayPort) throws IOException { InputStream input = - getClass().getClassLoader().getResourceAsStream("grpc/python/zeppelin_python.py"); + getClass().getClassLoader().getResourceAsStream("python/zeppelin_ipython.py"); List<String> lines = IOUtils.readLines(input); - ExecuteResponse response = ipythonClient.block_execute(ExecuteRequest.newBuilder() - .setCode(StringUtils.join(lines, System.lineSeparator()) - .replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "") - .replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build()); + ExecuteResponse response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder() + .setCode(StringUtils.join(lines, System.lineSeparator()) + .replace("${JVM_GATEWAY_PORT}", gatewayPort + "") + .replace("${JVM_GATEWAY_ADDRESS}", gatewayHost)).build()); if (response.getStatus() != ExecuteStatus.SUCCESS) { throw new IOException("Fail to setup JVMGateway\n" + response.getOutput()); } input = - getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py"); + getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py"); lines = IOUtils.readLines(input); - response = ipythonClient.block_execute(ExecuteRequest.newBuilder() - .setCode(StringUtils.join(lines, System.lineSeparator())).build()); + response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder() + .setCode(StringUtils.join(lines, System.lineSeparator())).build()); if (response.getStatus() != ExecuteStatus.SUCCESS) { throw new IOException("Fail to import ZeppelinContext\n" + response.getOutput()); } - response = ipythonClient.block_execute(ExecuteRequest.newBuilder() - .setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)") - .build()); + response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder() + .setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)") + .build()); if (response.getStatus() != ExecuteStatus.SUCCESS) { throw new IOException("Fail to setup ZeppelinContext\n" + response.getOutput()); } @@ -238,65 +153,34 @@ public class IPythonInterpreter extends Interpreter { if (additionalPythonInitFile != null) { input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile); lines = IOUtils.readLines(input); - response = ipythonClient.block_execute(ExecuteRequest.newBuilder() - .setCode(StringUtils.join(lines, System.lineSeparator()) - .replace("${JVM_GATEWAY_PORT}", jvmGatewayPort + "") - .replace("${JVM_GATEWAY_ADDRESS}", serverAddress)).build()); + response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder() + .setCode(StringUtils.join(lines, System.lineSeparator()) + .replace("${JVM_GATEWAY_PORT}", gatewayPort + "") + .replace("${JVM_GATEWAY_ADDRESS}", gatewayHost)).build()); if (response.getStatus() != ExecuteStatus.SUCCESS) { LOGGER.error("Fail to run additional Python init file\n" + response.getOutput()); throw new IOException("Fail to run additional Python init file: " - + additionalPythonInitFile + "\n" + response.getOutput()); + + additionalPythonInitFile + "\n" + response.getOutput()); } } } - private void launchIPythonKernel(int ipythonPort) - throws IOException { - LOGGER.info("Launching IPython Kernel at port: " + ipythonPort); - // copy the python scripts to a temp directory, then launch ipython kernel in that folder - File pythonWorkDir = Files.createTempDirectory("zeppelin_ipython").toFile(); - String[] ipythonScripts = {"ipython_server.py", "ipython_pb2.py", "ipython_pb2_grpc.py"}; - for (String ipythonScript : ipythonScripts) { - URL url = getClass().getClassLoader().getResource("grpc/python" - + "/" + ipythonScript); - FileUtils.copyURLToFile(url, new File(pythonWorkDir, ipythonScript)); - } - - CommandLine cmd = CommandLine.parse(pythonExecutable); - cmd.addArgument(pythonWorkDir.getAbsolutePath() + "/ipython_server.py"); - cmd.addArgument(ipythonPort + ""); - + @Override + protected Map<String, String> setupKernelEnv() throws IOException { + Map<String, String> envs = super.setupKernelEnv(); if (useBuiltinPy4j) { //TODO(zjffdu) don't do hard code on py4j here - File py4jDestFile = new File(pythonWorkDir, "py4j-src-0.10.7.zip"); + File py4jDestFile = new File(kernelWorkDir, "py4j-src-0.10.7.zip"); FileUtils.copyURLToFile(getClass().getClassLoader().getResource( - "python/py4j-src-0.10.7.zip"), py4jDestFile); + "python/py4j-src-0.10.7.zip"), py4jDestFile); if (additionalPythonPath != null) { // put the py4j at the end, because additionalPythonPath may already contain py4j. - // e.g. PySparkInterpreter + // e.g. IPySparkInterpreter additionalPythonPath = additionalPythonPath + ":" + py4jDestFile.getAbsolutePath(); } else { additionalPythonPath = py4jDestFile.getAbsolutePath(); } } - - Map<String, String> envs = setupIPythonEnv(); - iPythonProcessLauncher = new IPythonProcessLauncher(cmd, envs); - iPythonProcessLauncher.launch(); - iPythonProcessLauncher.waitForReady(ipythonLaunchTimeout); - - if (iPythonProcessLauncher.isLaunchTimeout()) { - throw new IOException("Fail to launch IPython Kernel in " + ipythonLaunchTimeout / 1000 - + " seconds.\n" + iPythonProcessLauncher.getErrorMessage()); - } - if (!iPythonProcessLauncher.isRunning()) { - throw new IOException("Fail to launch IPython Kernel as the python process is failed.\n" - + iPythonProcessLauncher.getErrorMessage()); - } - } - - protected Map<String, String> setupIPythonEnv() throws IOException { - Map<String, String> envs = EnvironmentUtils.getProcEnvironment(); if (envs.containsKey("PYTHONPATH")) { if (additionalPythonPath != null) { envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH")); @@ -304,153 +188,23 @@ public class IPythonInterpreter extends Interpreter { } else { envs.put("PYTHONPATH", additionalPythonPath); } + + this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true")); + this.py4jGatewaySecret = PythonUtils.createSecret(256); if (usePy4JAuth) { - envs.put("PY4J_GATEWAY_SECRET", secret); + envs.put("PY4J_GATEWAY_SECRET", this.py4jGatewaySecret); } LOGGER.info("PYTHONPATH:" + envs.get("PYTHONPATH")); return envs; } - @VisibleForTesting - public IPythonProcessLauncher getIPythonProcessLauncher() { - return iPythonProcessLauncher; - } - @Override public void close() throws InterpreterException { - if (iPythonProcessLauncher != null) { - LOGGER.info("Kill IPython Process"); - if (iPythonProcessLauncher.isRunning()) { - ipythonClient.stop(StopRequest.newBuilder().build()); - try { - ipythonClient.shutdown(); - } catch (InterruptedException e) { - LOGGER.warn("Exception happens when shutting down ipythonClient", e); - } - } - iPythonProcessLauncher.stop(); - iPythonProcessLauncher = null; - } + super.close(); if (gatewayServer != null) { LOGGER.info("Shutdown Py4j GatewayServer"); gatewayServer.shutdown(); gatewayServer = null; } } - - @Override - public InterpreterResult interpret(String st, - InterpreterContext context) throws InterpreterException { - zeppelinContext.setGui(context.getGui()); - zeppelinContext.setNoteGui(context.getNoteGui()); - zeppelinContext.setInterpreterContext(context); - interpreterOutput.setInterpreterOutput(context.out); - try { - ExecuteResponse response = - ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(), - interpreterOutput); - interpreterOutput.getInterpreterOutput().flush(); - // It is not known which method is called first (ipythonClient.stream_execute - // or onProcessFailed) when ipython kernel process is exited. Because they are in - // 2 different threads. So here we would check ipythonClient's status and sleep 1 second - // if ipython kernel is maybe terminated. - if (iPythonProcessLauncher.isRunning() && !ipythonClient.isMaybeIPythonFailed()) { - return new InterpreterResult( - InterpreterResult.Code.valueOf(response.getStatus().name())); - } else { - if (ipythonClient.isMaybeIPythonFailed()) { - Thread.sleep(1000); - } - if (iPythonProcessLauncher.isRunning()) { - return new InterpreterResult( - InterpreterResult.Code.valueOf(response.getStatus().name())); - } else { - return new InterpreterResult(InterpreterResult.Code.ERROR, - "IPython kernel is abnormally exited, please check your code and log."); - } - } - } catch (Exception e) { - throw new InterpreterException("Fail to interpret python code", e); - } - } - - @Override - public void cancel(InterpreterContext context) throws InterpreterException { - ipythonClient.cancel(CancelRequest.newBuilder().build()); - } - - @Override - public FormType getFormType() { - return FormType.SIMPLE; - } - - @Override - public int getProgress(InterpreterContext context) throws InterpreterException { - return 0; - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - LOGGER.debug("Call completion for: " + buf + ", cursor: " + cursor); - List<InterpreterCompletion> completions = new ArrayList<>(); - CompletionResponse response = - ipythonClient.complete( - CompletionRequest.getDefaultInstance().newBuilder().setCode(buf) - .setCursor(cursor).build()); - for (int i = 0; i < response.getMatchesCount(); i++) { - String match = response.getMatches(i); - int lastIndexOfDot = match.lastIndexOf("."); - if (lastIndexOfDot != -1) { - match = match.substring(lastIndexOfDot + 1); - } - LOGGER.debug("Candidate completion: " + match); - completions.add(new InterpreterCompletion(match, match, "")); - } - return completions; - } - - public BaseZeppelinContext getZeppelinContext() { - return zeppelinContext; - } - - class IPythonProcessLauncher extends ProcessLauncher { - - IPythonProcessLauncher(CommandLine commandLine, - Map<String, String> envs) { - super(commandLine, envs); - } - - @Override - public void waitForReady(int timeout) { - // wait until IPython kernel is started or timeout - long startTime = System.currentTimeMillis(); - while (state == State.LAUNCHED) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOGGER.error("Interrupted by something", e); - } - - try { - StatusResponse response = ipythonClient.status(StatusRequest.newBuilder().build()); - if (response.getStatus() == IPythonStatus.RUNNING) { - LOGGER.info("IPython Kernel is Running"); - onProcessRunning(); - break; - } else { - LOGGER.info("Wait for IPython Kernel to be started"); - } - } catch (Exception e) { - // ignore the exception, because is may happen when grpc server has not started yet. - LOGGER.info("Wait for IPython Kernel to be started"); - } - - if ((System.currentTimeMillis() - startTime) > timeout) { - onTimeout(); - break; - } - } - } - } } diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index cb958ab..4d8a4b7 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -83,7 +83,7 @@ public class PythonInterpreter extends Interpreter { iPythonInterpreter = getIPythonInterpreter(); if (getProperty("zeppelin.python.useIPython", "true").equals("true") && StringUtils.isEmpty( - iPythonInterpreter.checkIPythonPrerequisite(getPythonExec()))) { + iPythonInterpreter.checkKernelPrerequisite(getPythonExec()))) { try { iPythonInterpreter.open(); LOGGER.info("IPython is available, Use IPythonInterpreter to replace PythonInterpreter"); diff --git a/python/src/main/resources/grpc/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_ipython.py similarity index 100% rename from python/src/main/resources/grpc/python/zeppelin_python.py rename to python/src/main/resources/python/zeppelin_ipython.py diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index 99579b0..b0a8ba6 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -384,7 +384,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest { tearDown(); Properties properties = initIntpProperties(); - properties.setProperty("zeppelin.ipython.grpc.message_size", "4000"); + properties.setProperty("zeppelin.jupyter.kernel.grpc.message_size", "4000"); startInterpreter(properties); @@ -443,7 +443,7 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest { Thread.sleep(3000); IPythonInterpreter iPythonInterpreter = (IPythonInterpreter) ((LazyOpenInterpreter) interpreter).getInnerInterpreter(); - iPythonInterpreter.getIPythonProcessLauncher().stop(); + iPythonInterpreter.getKernelProcessLauncher().stop(); waiter.await(3000); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index c69e6fd..c961336 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -59,7 +59,7 @@ public class IPySparkInterpreter extends IPythonInterpreter { !conf.get("spark.submit.deployMode").equals("cluster")) { setAdditionalPythonPath(PythonUtils.sparkPythonPath()); } - setAddBulitinPy4j(false); + setUseBuiltinPy4j(false); setAdditionalPythonInitFile("python/zeppelin_ipyspark.py"); setProperty("zeppelin.py4j.useAuth", sparkInterpreter.getSparkVersion().isSecretSocketSupported() + ""); @@ -67,8 +67,8 @@ public class IPySparkInterpreter extends IPythonInterpreter { } @Override - protected Map<String, String> setupIPythonEnv() throws IOException { - Map<String, String> env = super.setupIPythonEnv(); + protected Map<String, String> setupKernelEnv() throws IOException { + Map<String, String> env = super.setupKernelEnv(); // set PYSPARK_PYTHON SparkConf conf = sparkInterpreter.getSparkContext().getConf(); if (conf.contains("spark.pyspark.python")) { diff --git a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java index 8d9c22f..e8c3e56 100644 --- a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java +++ b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java @@ -73,7 +73,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT { static String interpreterOptionPath = ""; static String originalInterpreterOption = ""; - static String cmdPsPython = "ps aux | grep 'zeppelin_ipython' | grep -v 'grep' | wc -l"; + static String cmdPsPython = "ps aux | grep 'kernel_server.py' | grep -v 'grep' | wc -l"; static String cmdPsInterpreter = "ps aux | grep 'zeppelin/interpreter/python/*' |" + " sed -E '/grep|local-repo/d' | wc -l"; diff --git a/python/pom.xml b/zeppelin-jupyter-adapter/pom.xml similarity index 66% copy from python/pom.xml copy to zeppelin-jupyter-adapter/pom.xml index 62a2647..2b180d7 100644 --- a/python/pom.xml +++ b/zeppelin-jupyter-adapter/pom.xml @@ -21,26 +21,26 @@ <modelVersion>4.0.0</modelVersion> <parent> - <artifactId>zeppelin-interpreter-parent</artifactId> + <artifactId>zeppelin</artifactId> <groupId>org.apache.zeppelin</groupId> <version>0.9.0-SNAPSHOT</version> - <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> + <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.zeppelin</groupId> - <artifactId>zeppelin-python</artifactId> + <artifactId>zeppelin-jupyter-adapter</artifactId> <packaging>jar</packaging> <version>0.9.0-SNAPSHOT</version> - <name>Zeppelin: Python interpreter</name> + <name>Zeppelin: Jupyter Adapter</name> <properties> <interpreter.name>python</interpreter.name> <python.py4j.version>0.10.7</python.py4j.version> <grpc.version>1.15.0</grpc.version> - <interpreter.jar.name>python-interpreter-with-py4j</interpreter.jar.name> </properties> <dependencies> + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> @@ -58,11 +58,13 @@ <artifactId>grpc-netty</artifactId> <version>${grpc.version}</version> </dependency> + <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>${grpc.version}</version> </dependency> + <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> @@ -73,14 +75,35 @@ <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> + <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- test libraries --> <dependency> @@ -142,20 +165,6 @@ </executions> </plugin> - <!-- publish test jar as well so that spark module can use it --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>3.0.2</version> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> @@ -169,45 +178,6 @@ <artifactId>maven-resources-plugin</artifactId> </plugin> - <!-- - shade for python interpreter is different from other interpreter, it depends on zeppelin-interpreter instead of - zeppelin-interpreter-api. Because spark interpreter depends on python interpreter and spark's py4j conflict with python interpreter's py4j. - python interpreter would generate 2 versions of jars, one is shaded jar which is used for running python interpreter, another is normal jar - which is used by spark interpreter as dependency. - --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>${plugin.shade.version}</version> - <configuration> - <filters> - <filter> - <artifact>*:*</artifact> - </filter> - </filters> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>reference.conf</resource> - </transformer> - </transformers> - <artifactSet> - <excludes> - <exclude>org.apache.zeppelin:zeppelin-interpreter-api</exclude> - </excludes> - </artifactSet> - <outputFile>${project.build.directory}/../../interpreter/python/${interpreter.jar.name}-${project.version}.jar</outputFile> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java b/zeppelin-jupyter-adapter/src/main/java/org/apache/zeppelin/interpreter/JupyterKernelClient.java similarity index 80% rename from python/src/main/java/org/apache/zeppelin/python/IPythonClient.java rename to zeppelin-jupyter-adapter/src/main/java/org/apache/zeppelin/interpreter/JupyterKernelClient.java index 9ad0031..f8ba79e 100644 --- a/python/src/main/java/org/apache/zeppelin/python/IPythonClient.java +++ b/zeppelin-jupyter-adapter/src/main/java/org/apache/zeppelin/interpreter/JupyterKernelClient.java @@ -15,25 +15,25 @@ * limitations under the License. */ -package org.apache.zeppelin.python; +package org.apache.zeppelin.interpreter; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.zeppelin.interpreter.jupyter.proto.JupyterKernelGrpc; import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; -import org.apache.zeppelin.python.proto.CancelRequest; -import org.apache.zeppelin.python.proto.CancelResponse; -import org.apache.zeppelin.python.proto.CompletionRequest; -import org.apache.zeppelin.python.proto.CompletionResponse; -import org.apache.zeppelin.python.proto.ExecuteRequest; -import org.apache.zeppelin.python.proto.ExecuteResponse; -import org.apache.zeppelin.python.proto.ExecuteStatus; -import org.apache.zeppelin.python.proto.IPythonGrpc; -import org.apache.zeppelin.python.proto.OutputType; -import org.apache.zeppelin.python.proto.StatusRequest; -import org.apache.zeppelin.python.proto.StatusResponse; -import org.apache.zeppelin.python.proto.StopRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.CancelRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.CancelResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.CompletionRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.CompletionResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus; +import org.apache.zeppelin.interpreter.jupyter.proto.OutputType; +import org.apache.zeppelin.interpreter.jupyter.proto.StatusRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.StatusResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.StopRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,33 +44,33 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** - * Grpc client for IPython kernel + * Grpc client for Jupyter kernel */ -public class IPythonClient { +public class JupyterKernelClient { - private static final Logger LOGGER = LoggerFactory.getLogger(IPythonClient.class.getName()); + private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelClient.class.getName()); private final ManagedChannel channel; - private final IPythonGrpc.IPythonBlockingStub blockingStub; - private final IPythonGrpc.IPythonStub asyncStub; - private volatile boolean maybeIPythonFailed = false; + private final JupyterKernelGrpc.JupyterKernelBlockingStub blockingStub; + private final JupyterKernelGrpc.JupyterKernelStub asyncStub; + private volatile boolean maybeKernelFailed = false; private SecureRandom random = new SecureRandom(); /** * Construct client for accessing RouteGuide server at {@code host:port}. */ - public IPythonClient(String host, int port) { + public JupyterKernelClient(String host, int port) { this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(true)); } /** * Construct client for accessing RouteGuide server using the existing channel. */ - public IPythonClient(ManagedChannelBuilder<?> channelBuilder) { + public JupyterKernelClient(ManagedChannelBuilder<?> channelBuilder) { channel = channelBuilder.build(); - blockingStub = IPythonGrpc.newBlockingStub(channel); - asyncStub = IPythonGrpc.newStub(channel); + blockingStub = JupyterKernelGrpc.newBlockingStub(channel); + asyncStub = JupyterKernelGrpc.newStub(channel); } public void shutdown() throws InterruptedException { @@ -84,7 +84,7 @@ public class IPythonClient { final ExecuteResponse.Builder finalResponseBuilder = ExecuteResponse.newBuilder() .setStatus(ExecuteStatus.SUCCESS); final AtomicBoolean completedFlag = new AtomicBoolean(false); - maybeIPythonFailed = false; + maybeKernelFailed = false; LOGGER.debug("stream_execute code:\n" + request.getCode()); asyncStub.execute(request, new StreamObserver<ExecuteResponse>() { OutputType lastOutputType = null; @@ -97,7 +97,7 @@ public class IPythonClient { case TEXT: try { if (executeResponse.getOutput().startsWith("%")) { - // the output from ipython kernel maybe specify format already. + // the output from jupyter kernel maybe specify format already. interpreterOutput.write((executeResponse.getOutput()).getBytes()); } else { // only add %text when the previous output type is not TEXT. @@ -154,7 +154,7 @@ public class IPythonClient { } LOGGER.error("Fail to call IPython grpc", throwable); finalResponseBuilder.setStatus(ExecuteStatus.ERROR); - maybeIPythonFailed = true; + maybeKernelFailed = true; completedFlag.set(true); synchronized (completedFlag) { completedFlag.notify(); @@ -221,12 +221,12 @@ public class IPythonClient { asyncStub.stop(request, null); } - public boolean isMaybeIPythonFailed() { - return maybeIPythonFailed; + public boolean isMaybeKernelFailed() { + return maybeKernelFailed; } public static void main(String[] args) { - IPythonClient client = new IPythonClient("localhost", 50053); + JupyterKernelClient client = new JupyterKernelClient("localhost", 50053); client.status(StatusRequest.newBuilder().build()); ExecuteResponse response = client.block_execute(ExecuteRequest.newBuilder(). diff --git a/zeppelin-jupyter-adapter/src/main/java/org/apache/zeppelin/interpreter/JupyterKernelInterpreter.java b/zeppelin-jupyter-adapter/src/main/java/org/apache/zeppelin/interpreter/JupyterKernelInterpreter.java new file mode 100644 index 0000000..0aedbbf --- /dev/null +++ b/zeppelin-jupyter-adapter/src/main/java/org/apache/zeppelin/interpreter/JupyterKernelInterpreter.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ManagedChannelBuilder; +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.interpreter.jupyter.proto.CancelRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.CompletionRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.CompletionResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.KernelStatus; +import org.apache.zeppelin.interpreter.jupyter.proto.StatusRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.StatusResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.StopRequest; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; +import org.apache.zeppelin.interpreter.util.ProcessLauncher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Jupyter Kernel adapter for Zeppelin. All the jupyter kernel could be used by Zeppelin + * by extending this class. + */ +public abstract class JupyterKernelInterpreter extends Interpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelInterpreter.class); + + private JupyterKernelProcessLauncher jupyterKernelProcessLauncher; + protected JupyterKernelClient jupyterKernelClient; + + protected BaseZeppelinContext zeppelinContext; + // working directory of jupyter kernel + protected File kernelWorkDir; + // python executable file for launching the jupyter kernel + private String pythonExecutable; + private int kernelLaunchTimeout; + + private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER); + + public JupyterKernelInterpreter(Properties properties) { + super(properties); + } + + public abstract String getKernelName(); + + public List<String> getRequiredPackages() { + List<String> requiredPackages = new ArrayList<>(); + requiredPackages.add("jupyter-client"); + requiredPackages.add("grpcio"); + requiredPackages.add("protobuf"); + return requiredPackages; + } + + public abstract BaseZeppelinContext buildZeppelinContext(); + + @Override + public void open() throws InterpreterException { + try { + if (jupyterKernelClient != null) { + // JupyterKernelInterpreter might already been opened + return; + } + pythonExecutable = getProperty("zeppelin.python", "python"); + LOGGER.info("Python Exec: " + pythonExecutable); + String checkPrerequisiteResult = checkKernelPrerequisite(pythonExecutable); + if (!StringUtils.isEmpty(checkPrerequisiteResult)) { + throw new InterpreterException("Kernel prerequisite is not meet: " + + checkPrerequisiteResult); + } + kernelLaunchTimeout = Integer.parseInt( + getProperty("zeppelin.jupyter.kernel.launch.timeout", "30000")); + this.zeppelinContext = buildZeppelinContext(); + int kernelPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + int message_size = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size", + 32 * 1024 * 1024 + "")); + jupyterKernelClient = new JupyterKernelClient(ManagedChannelBuilder.forAddress("127.0.0.1", + kernelPort) + .usePlaintext(true).maxInboundMessageSize(message_size)); + + launchJupyterKernel(kernelPort); + } catch (Exception e) { + throw new InterpreterException("Fail to open JupyterKernelInterpreter:\n" + + ExceptionUtils.getStackTrace(e), e); + } + } + + /** + * non-empty return value mean the errors when checking kernel prerequisite. + * empty value mean kernel prerequisite is met. + * + * @return check result of checking kernel prerequisite. + */ + public String checkKernelPrerequisite(String pythonExec) { + ProcessBuilder processBuilder = new ProcessBuilder(pythonExec, "-m", "pip", "freeze"); + File stderrFile = null; + File stdoutFile = null; + try { + stderrFile = File.createTempFile("zeppelin", ".txt"); + processBuilder.redirectError(stderrFile); + stdoutFile = File.createTempFile("zeppelin", ".txt"); + processBuilder.redirectOutput(stdoutFile); + + Process proc = processBuilder.start(); + int ret = proc.waitFor(); + if (ret != 0) { + try (FileInputStream in = new FileInputStream(stderrFile)) { + return "Fail to run pip freeze.\n" + IOUtils.toString(in); + } + } + try (FileInputStream in = new FileInputStream(stdoutFile)) { + String freezeOutput = IOUtils.toString(in); + for (String packageName : getRequiredPackages()) { + if (!freezeOutput.contains(packageName + "=")) { + return packageName + " is not installed."; + } + } + LOGGER.info("Prerequisite for kernel " + getKernelName() + " is met"); + } + } catch (Exception e) { + LOGGER.warn("Fail to checkKernelPrerequisite", e); + return "Fail to checkKernelPrerequisite: " + ExceptionUtils.getStackTrace(e); + } finally { + FileUtils.deleteQuietly(stderrFile); + FileUtils.deleteQuietly(stdoutFile); + } + return ""; + } + + private void launchJupyterKernel(int kernelPort) + throws IOException { + LOGGER.info("Launching Jupyter Kernel at port: " + kernelPort); + // copy the python scripts to a temp directory, then launch jupyter kernel in that folder + this.kernelWorkDir = Files.createTempDirectory( + "zeppelin_jupyter_kernel_" + getKernelName()).toFile(); + String[] kernelScripts = {"kernel_server.py", "kernel_pb2.py", "kernel_pb2_grpc.py"}; + for (String kernelScript : kernelScripts) { + URL url = getClass().getClassLoader().getResource("grpc/jupyter" + + "/" + kernelScript); + FileUtils.copyURLToFile(url, new File(kernelWorkDir, kernelScript)); + } + + CommandLine cmd = CommandLine.parse(pythonExecutable); + cmd.addArgument(kernelWorkDir.getAbsolutePath() + "/kernel_server.py"); + cmd.addArgument(getKernelName()); + cmd.addArgument(kernelPort + ""); + + Map<String, String> envs = setupKernelEnv(); + jupyterKernelProcessLauncher = new JupyterKernelProcessLauncher(cmd, envs); + jupyterKernelProcessLauncher.launch(); + jupyterKernelProcessLauncher.waitForReady(kernelLaunchTimeout); + + if (jupyterKernelProcessLauncher.isLaunchTimeout()) { + throw new IOException("Fail to launch Jupyter Kernel in " + kernelLaunchTimeout / 1000 + + " seconds.\n" + jupyterKernelProcessLauncher.getErrorMessage()); + } + if (!jupyterKernelProcessLauncher.isRunning()) { + throw new IOException("Fail to launch Jupyter Kernel as the python process is failed.\n" + + jupyterKernelProcessLauncher.getErrorMessage()); + } + } + + protected Map<String, String> setupKernelEnv() throws IOException { + return EnvironmentUtils.getProcEnvironment(); + } + + @VisibleForTesting + public JupyterKernelProcessLauncher getKernelProcessLauncher() { + return jupyterKernelProcessLauncher; + } + + @Override + public void close() throws InterpreterException { + if (jupyterKernelProcessLauncher != null) { + LOGGER.info("Killing Jupyter Kernel Process"); + if (jupyterKernelProcessLauncher.isRunning()) { + jupyterKernelClient.stop(StopRequest.newBuilder().build()); + try { + jupyterKernelClient.shutdown(); + } catch (InterruptedException e) { + LOGGER.warn("Exception happens when shutting down jupyter kernel client", e); + } + } + jupyterKernelProcessLauncher.stop(); + jupyterKernelProcessLauncher = null; + LOGGER.info("Jupyter Kernel is killed"); + } + } + + @Override + public InterpreterResult interpret(String st, + InterpreterContext context) throws InterpreterException { + zeppelinContext.setGui(context.getGui()); + zeppelinContext.setNoteGui(context.getNoteGui()); + zeppelinContext.setInterpreterContext(context); + interpreterOutput.setInterpreterOutput(context.out); + try { + ExecuteResponse response = + jupyterKernelClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(), + interpreterOutput); + interpreterOutput.getInterpreterOutput().flush(); + // It is not known which method is called first (JupyterKernelClient.stream_execute + // or onProcessFailed) when jupyter kernel process is exited. Because they are in + // 2 different threads. So here we would check JupyterKernelClient's status and sleep 1 second + // if jupyter kernel is maybe terminated. + if (jupyterKernelProcessLauncher.isRunning() && !jupyterKernelClient.isMaybeKernelFailed()) { + return new InterpreterResult( + InterpreterResult.Code.valueOf(response.getStatus().name())); + } else { + if (jupyterKernelClient.isMaybeKernelFailed()) { + Thread.sleep(1000); + } + if (jupyterKernelProcessLauncher.isRunning()) { + return new InterpreterResult( + InterpreterResult.Code.valueOf(response.getStatus().name())); + } else { + return new InterpreterResult(InterpreterResult.Code.ERROR, + "IPython kernel is abnormally exited, please check your code and log."); + } + } + } catch (Exception e) { + throw new InterpreterException("Fail to interpret python code", e); + } + } + + @Override + public void cancel(InterpreterContext context) throws InterpreterException { + jupyterKernelClient.cancel(CancelRequest.newBuilder().build()); + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + LOGGER.debug("Call completion for: " + buf + ", cursor: " + cursor); + List<InterpreterCompletion> completions = new ArrayList<>(); + CompletionResponse response = + jupyterKernelClient.complete( + CompletionRequest.getDefaultInstance().newBuilder().setCode(buf) + .setCursor(cursor).build()); + for (int i = 0; i < response.getMatchesCount(); i++) { + String match = response.getMatches(i); + int lastIndexOfDot = match.lastIndexOf("."); + if (lastIndexOfDot != -1) { + match = match.substring(lastIndexOfDot + 1); + } + LOGGER.debug("Candidate completion: " + match); + completions.add(new InterpreterCompletion(match, match, "")); + } + return completions; + } + + public BaseZeppelinContext getZeppelinContext() { + return zeppelinContext; + } + + public class JupyterKernelProcessLauncher extends ProcessLauncher { + + JupyterKernelProcessLauncher(CommandLine commandLine, + Map<String, String> envs) { + super(commandLine, envs); + } + + @Override + public void waitForReady(int timeout) { + // wait until jupyter kernel is started or timeout + long startTime = System.currentTimeMillis(); + while (state == State.LAUNCHED) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOGGER.error("Interrupted by something", e); + } + + try { + StatusResponse response = jupyterKernelClient.status(StatusRequest.newBuilder().build()); + if (response.getStatus() == KernelStatus.RUNNING) { + LOGGER.info("Jupyter Kernel is Running"); + onProcessRunning(); + break; + } else { + LOGGER.info("Wait for Jupyter Kernel to be started"); + } + } catch (Exception e) { + // ignore the exception, because is may happen when grpc server has not started yet. + LOGGER.info("Wait for Jupyter Kernel to be started"); + } + + if ((System.currentTimeMillis() - startTime) > timeout) { + onTimeout(); + break; + } + } + } + } +} diff --git a/zeppelin-jupyter-adapter/src/main/proto/kernel.proto b/zeppelin-jupyter-adapter/src/main/proto/kernel.proto new file mode 100644 index 0000000..abc7a35 --- /dev/null +++ b/zeppelin-jupyter-adapter/src/main/proto/kernel.proto @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.zeppelin.interpreter.jupyter.proto"; +option java_outer_classname = "JupyterKernelProto"; +option objc_class_prefix = "JupyterKernel"; + +package jupyter; + +// The JupyterKernel service definition. +service JupyterKernel { + // Sends code + rpc execute (ExecuteRequest) returns (stream ExecuteResponse) {} + + // Get completion + rpc complete (CompletionRequest) returns (CompletionResponse) {} + + // Cancel the running statement + rpc cancel (CancelRequest) returns (CancelResponse) {} + + // Get jupyter kernel status + rpc status (StatusRequest) returns (StatusResponse) {} + + // Stop jupyter kernel + rpc stop(StopRequest) returns (StopResponse) {} +} + +enum ExecuteStatus { + SUCCESS = 0; + ERROR = 1; +} + +enum KernelStatus { + STARTING = 0; + RUNNING = 1; +} + +enum OutputType { + TEXT = 0; + PNG = 1; + JPEG = 2; + HTML = 3; + SVG = 4; + JSON = 5; + LaTeX = 6; +} + +// The request message containing the code +message ExecuteRequest { + string code = 1; +} + +// The response message containing the execution result. +message ExecuteResponse { + ExecuteStatus status = 1; + OutputType type = 2; + string output = 3; +} + +message CancelRequest { + +} + +message CancelResponse { + +} + +message CompletionRequest { + string code = 1; + int32 cursor = 2; +} + +message CompletionResponse { + repeated string matches = 1; +} + +message StatusRequest { + +} + +message StatusResponse { + KernelStatus status = 1; +} + +message StopRequest { + +} + +message StopResponse { + +} \ No newline at end of file diff --git a/python/src/main/resources/grpc/generate_rpc.sh b/zeppelin-jupyter-adapter/src/main/resources/grpc/generate_rpc.sh similarity index 87% rename from python/src/main/resources/grpc/generate_rpc.sh rename to zeppelin-jupyter-adapter/src/main/resources/grpc/generate_rpc.sh index efa5fbe..25edc9e 100755 --- a/python/src/main/resources/grpc/generate_rpc.sh +++ b/zeppelin-jupyter-adapter/src/main/resources/grpc/generate_rpc.sh @@ -15,4 +15,4 @@ #!/usr/bin/env bash -python -m grpc_tools.protoc -I../../proto --python_out=python --grpc_python_out=python ../../proto/ipython.proto +python -m grpc_tools.protoc -I../../proto --python_out=jupyter --grpc_python_out=jupyter ../../proto/kernel.proto diff --git a/python/src/main/resources/grpc/python/ipython_client.py b/zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_client.py similarity index 79% rename from python/src/main/resources/grpc/python/ipython_client.py rename to zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_client.py index b8d1ee0..ac7b1e0 100644 --- a/python/src/main/resources/grpc/python/ipython_client.py +++ b/zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_client.py @@ -16,19 +16,19 @@ import grpc -import ipython_pb2 -import ipython_pb2_grpc +import kernel_pb2 +import kernel_pb2_grpc def run(): channel = grpc.insecure_channel('localhost:50053') - stub = ipython_pb2_grpc.IPythonStub(channel) - response = stub.execute(ipython_pb2.ExecuteRequest(code="import time\nfor i in range(1,4):\n\ttime.sleep(1)\n\tprint(i)\n" + + stub = kernel_pb2_grpc.JupyterKernelStub(channel) + response = stub.execute(kernel_pb2.ExecuteRequest(code="import time\nfor i in range(1,4):\n\ttime.sleep(1)\n\tprint(i)\n" + "%matplotlib inline\nimport matplotlib.pyplot as plt\ndata=[1,1,2,3,4]\nplt.figure()\nplt.plot(data)")) for r in response: print("output:" + r.output) - response = stub.execute(ipython_pb2.ExecuteRequest(code="range?")) + response = stub.execute(kernel_pb2.ExecuteRequest(code="range?")) for r in response: print(r) diff --git a/python/src/main/resources/grpc/python/ipython_pb2.py b/zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_pb2.py similarity index 72% rename from python/src/main/resources/grpc/python/ipython_pb2.py rename to zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_pb2.py index 0bad160..094cf86 100644 --- a/python/src/main/resources/grpc/python/ipython_pb2.py +++ b/zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_pb2.py @@ -13,8 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. + # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: ipython.proto +# source: kernel.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) @@ -31,16 +32,16 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( - name='ipython.proto', - package='ipython', + name='kernel.proto', + package='jupyter', syntax='proto3', - serialized_options=_b('\n org.apache.zeppelin.python.protoB\014IPythonProtoP\001\242\002\007IPython'), - serialized_pb=_b('\n\ripython.proto\x12\x07ipython\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.ipython.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.ipython.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionRe [...] + serialized_options=_b('\n-org.apache.zeppelin.interpreter.jupyter.protoB\022JupyterKernelProtoP\001\242\002\rJupyterKernel'), + serialized_pb=_b('\n\x0ckernel.proto\x12\x07jupyter\"\x1e\n\x0e\x45xecuteRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\"l\n\x0f\x45xecuteResponse\x12&\n\x06status\x18\x01 \x01(\x0e\x32\x16.jupyter.ExecuteStatus\x12!\n\x04type\x18\x02 \x01(\x0e\x32\x13.jupyter.OutputType\x12\x0e\n\x06output\x18\x03 \x01(\t\"\x0f\n\rCancelRequest\"\x10\n\x0e\x43\x61ncelResponse\"1\n\x11\x43ompletionRequest\x12\x0c\n\x04\x63ode\x18\x01 \x01(\t\x12\x0e\n\x06\x63ursor\x18\x02 \x01(\x05\"%\n\x12\x43ompletionR [...] ) _EXECUTESTATUS = _descriptor.EnumDescriptor( name='ExecuteStatus', - full_name='ipython.ExecuteStatus', + full_name='jupyter.ExecuteStatus', filename=None, file=DESCRIPTOR, values=[ @@ -55,15 +56,15 @@ _EXECUTESTATUS = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=399, - serialized_end=438, + serialized_start=397, + serialized_end=436, ) _sym_db.RegisterEnumDescriptor(_EXECUTESTATUS) ExecuteStatus = enum_type_wrapper.EnumTypeWrapper(_EXECUTESTATUS) -_IPYTHONSTATUS = _descriptor.EnumDescriptor( - name='IPythonStatus', - full_name='ipython.IPythonStatus', +_KERNELSTATUS = _descriptor.EnumDescriptor( + name='KernelStatus', + full_name='jupyter.KernelStatus', filename=None, file=DESCRIPTOR, values=[ @@ -78,15 +79,15 @@ _IPYTHONSTATUS = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=440, - serialized_end=482, + serialized_start=438, + serialized_end=479, ) -_sym_db.RegisterEnumDescriptor(_IPYTHONSTATUS) +_sym_db.RegisterEnumDescriptor(_KERNELSTATUS) -IPythonStatus = enum_type_wrapper.EnumTypeWrapper(_IPYTHONSTATUS) +KernelStatus = enum_type_wrapper.EnumTypeWrapper(_KERNELSTATUS) _OUTPUTTYPE = _descriptor.EnumDescriptor( name='OutputType', - full_name='ipython.OutputType', + full_name='jupyter.OutputType', filename=None, file=DESCRIPTOR, values=[ @@ -121,8 +122,8 @@ _OUTPUTTYPE = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=484, - serialized_end=565, + serialized_start=481, + serialized_end=562, ) _sym_db.RegisterEnumDescriptor(_OUTPUTTYPE) @@ -143,13 +144,13 @@ LaTeX = 6 _EXECUTEREQUEST = _descriptor.Descriptor( name='ExecuteRequest', - full_name='ipython.ExecuteRequest', + full_name='jupyter.ExecuteRequest', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='code', full_name='ipython.ExecuteRequest.code', index=0, + name='code', full_name='jupyter.ExecuteRequest.code', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -167,34 +168,34 @@ _EXECUTEREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=26, - serialized_end=56, + serialized_start=25, + serialized_end=55, ) _EXECUTERESPONSE = _descriptor.Descriptor( name='ExecuteResponse', - full_name='ipython.ExecuteResponse', + full_name='jupyter.ExecuteResponse', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='status', full_name='ipython.ExecuteResponse.status', index=0, + name='status', full_name='jupyter.ExecuteResponse.status', index=0, number=1, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='type', full_name='ipython.ExecuteResponse.type', index=1, + name='type', full_name='jupyter.ExecuteResponse.type', index=1, number=2, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='output', full_name='ipython.ExecuteResponse.output', index=2, + name='output', full_name='jupyter.ExecuteResponse.output', index=2, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -212,14 +213,14 @@ _EXECUTERESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=58, - serialized_end=166, + serialized_start=57, + serialized_end=165, ) _CANCELREQUEST = _descriptor.Descriptor( name='CancelRequest', - full_name='ipython.CancelRequest', + full_name='jupyter.CancelRequest', filename=None, file=DESCRIPTOR, containing_type=None, @@ -236,14 +237,14 @@ _CANCELREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=168, - serialized_end=183, + serialized_start=167, + serialized_end=182, ) _CANCELRESPONSE = _descriptor.Descriptor( name='CancelResponse', - full_name='ipython.CancelResponse', + full_name='jupyter.CancelResponse', filename=None, file=DESCRIPTOR, containing_type=None, @@ -260,27 +261,27 @@ _CANCELRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=185, - serialized_end=201, + serialized_start=184, + serialized_end=200, ) _COMPLETIONREQUEST = _descriptor.Descriptor( name='CompletionRequest', - full_name='ipython.CompletionRequest', + full_name='jupyter.CompletionRequest', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='code', full_name='ipython.CompletionRequest.code', index=0, + name='code', full_name='jupyter.CompletionRequest.code', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='cursor', full_name='ipython.CompletionRequest.cursor', index=1, + name='cursor', full_name='jupyter.CompletionRequest.cursor', index=1, number=2, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, @@ -298,20 +299,20 @@ _COMPLETIONREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=203, - serialized_end=252, + serialized_start=202, + serialized_end=251, ) _COMPLETIONRESPONSE = _descriptor.Descriptor( name='CompletionResponse', - full_name='ipython.CompletionResponse', + full_name='jupyter.CompletionResponse', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='matches', full_name='ipython.CompletionResponse.matches', index=0, + name='matches', full_name='jupyter.CompletionResponse.matches', index=0, number=1, type=9, cpp_type=9, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, @@ -329,14 +330,14 @@ _COMPLETIONRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=254, - serialized_end=291, + serialized_start=253, + serialized_end=290, ) _STATUSREQUEST = _descriptor.Descriptor( name='StatusRequest', - full_name='ipython.StatusRequest', + full_name='jupyter.StatusRequest', filename=None, file=DESCRIPTOR, containing_type=None, @@ -353,20 +354,20 @@ _STATUSREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=293, - serialized_end=308, + serialized_start=292, + serialized_end=307, ) _STATUSRESPONSE = _descriptor.Descriptor( name='StatusResponse', - full_name='ipython.StatusResponse', + full_name='jupyter.StatusResponse', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='status', full_name='ipython.StatusResponse.status', index=0, + name='status', full_name='jupyter.StatusResponse.status', index=0, number=1, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, @@ -384,14 +385,14 @@ _STATUSRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=310, - serialized_end=366, + serialized_start=309, + serialized_end=364, ) _STOPREQUEST = _descriptor.Descriptor( name='StopRequest', - full_name='ipython.StopRequest', + full_name='jupyter.StopRequest', filename=None, file=DESCRIPTOR, containing_type=None, @@ -408,14 +409,14 @@ _STOPREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=368, - serialized_end=381, + serialized_start=366, + serialized_end=379, ) _STOPRESPONSE = _descriptor.Descriptor( name='StopResponse', - full_name='ipython.StopResponse', + full_name='jupyter.StopResponse', filename=None, file=DESCRIPTOR, containing_type=None, @@ -432,13 +433,13 @@ _STOPRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=383, - serialized_end=397, + serialized_start=381, + serialized_end=395, ) _EXECUTERESPONSE.fields_by_name['status'].enum_type = _EXECUTESTATUS _EXECUTERESPONSE.fields_by_name['type'].enum_type = _OUTPUTTYPE -_STATUSRESPONSE.fields_by_name['status'].enum_type = _IPYTHONSTATUS +_STATUSRESPONSE.fields_by_name['status'].enum_type = _KERNELSTATUS DESCRIPTOR.message_types_by_name['ExecuteRequest'] = _EXECUTEREQUEST DESCRIPTOR.message_types_by_name['ExecuteResponse'] = _EXECUTERESPONSE DESCRIPTOR.message_types_by_name['CancelRequest'] = _CANCELREQUEST @@ -450,95 +451,95 @@ DESCRIPTOR.message_types_by_name['StatusResponse'] = _STATUSRESPONSE DESCRIPTOR.message_types_by_name['StopRequest'] = _STOPREQUEST DESCRIPTOR.message_types_by_name['StopResponse'] = _STOPRESPONSE DESCRIPTOR.enum_types_by_name['ExecuteStatus'] = _EXECUTESTATUS -DESCRIPTOR.enum_types_by_name['IPythonStatus'] = _IPYTHONSTATUS +DESCRIPTOR.enum_types_by_name['KernelStatus'] = _KERNELSTATUS DESCRIPTOR.enum_types_by_name['OutputType'] = _OUTPUTTYPE _sym_db.RegisterFileDescriptor(DESCRIPTOR) ExecuteRequest = _reflection.GeneratedProtocolMessageType('ExecuteRequest', (_message.Message,), dict( DESCRIPTOR = _EXECUTEREQUEST, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.ExecuteRequest) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.ExecuteRequest) )) _sym_db.RegisterMessage(ExecuteRequest) ExecuteResponse = _reflection.GeneratedProtocolMessageType('ExecuteResponse', (_message.Message,), dict( DESCRIPTOR = _EXECUTERESPONSE, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.ExecuteResponse) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.ExecuteResponse) )) _sym_db.RegisterMessage(ExecuteResponse) CancelRequest = _reflection.GeneratedProtocolMessageType('CancelRequest', (_message.Message,), dict( DESCRIPTOR = _CANCELREQUEST, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.CancelRequest) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.CancelRequest) )) _sym_db.RegisterMessage(CancelRequest) CancelResponse = _reflection.GeneratedProtocolMessageType('CancelResponse', (_message.Message,), dict( DESCRIPTOR = _CANCELRESPONSE, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.CancelResponse) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.CancelResponse) )) _sym_db.RegisterMessage(CancelResponse) CompletionRequest = _reflection.GeneratedProtocolMessageType('CompletionRequest', (_message.Message,), dict( DESCRIPTOR = _COMPLETIONREQUEST, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.CompletionRequest) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.CompletionRequest) )) _sym_db.RegisterMessage(CompletionRequest) CompletionResponse = _reflection.GeneratedProtocolMessageType('CompletionResponse', (_message.Message,), dict( DESCRIPTOR = _COMPLETIONRESPONSE, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.CompletionResponse) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.CompletionResponse) )) _sym_db.RegisterMessage(CompletionResponse) StatusRequest = _reflection.GeneratedProtocolMessageType('StatusRequest', (_message.Message,), dict( DESCRIPTOR = _STATUSREQUEST, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.StatusRequest) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.StatusRequest) )) _sym_db.RegisterMessage(StatusRequest) StatusResponse = _reflection.GeneratedProtocolMessageType('StatusResponse', (_message.Message,), dict( DESCRIPTOR = _STATUSRESPONSE, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.StatusResponse) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.StatusResponse) )) _sym_db.RegisterMessage(StatusResponse) StopRequest = _reflection.GeneratedProtocolMessageType('StopRequest', (_message.Message,), dict( DESCRIPTOR = _STOPREQUEST, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.StopRequest) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.StopRequest) )) _sym_db.RegisterMessage(StopRequest) StopResponse = _reflection.GeneratedProtocolMessageType('StopResponse', (_message.Message,), dict( DESCRIPTOR = _STOPRESPONSE, - __module__ = 'ipython_pb2' - # @@protoc_insertion_point(class_scope:ipython.StopResponse) + __module__ = 'kernel_pb2' + # @@protoc_insertion_point(class_scope:jupyter.StopResponse) )) _sym_db.RegisterMessage(StopResponse) DESCRIPTOR._options = None -_IPYTHON = _descriptor.ServiceDescriptor( - name='IPython', - full_name='ipython.IPython', +_JUPYTERKERNEL = _descriptor.ServiceDescriptor( + name='JupyterKernel', + full_name='jupyter.JupyterKernel', file=DESCRIPTOR, index=0, serialized_options=None, - serialized_start=568, - serialized_end=891, + serialized_start=565, + serialized_end=894, methods=[ _descriptor.MethodDescriptor( name='execute', - full_name='ipython.IPython.execute', + full_name='jupyter.JupyterKernel.execute', index=0, containing_service=None, input_type=_EXECUTEREQUEST, @@ -547,7 +548,7 @@ _IPYTHON = _descriptor.ServiceDescriptor( ), _descriptor.MethodDescriptor( name='complete', - full_name='ipython.IPython.complete', + full_name='jupyter.JupyterKernel.complete', index=1, containing_service=None, input_type=_COMPLETIONREQUEST, @@ -556,7 +557,7 @@ _IPYTHON = _descriptor.ServiceDescriptor( ), _descriptor.MethodDescriptor( name='cancel', - full_name='ipython.IPython.cancel', + full_name='jupyter.JupyterKernel.cancel', index=2, containing_service=None, input_type=_CANCELREQUEST, @@ -565,7 +566,7 @@ _IPYTHON = _descriptor.ServiceDescriptor( ), _descriptor.MethodDescriptor( name='status', - full_name='ipython.IPython.status', + full_name='jupyter.JupyterKernel.status', index=3, containing_service=None, input_type=_STATUSREQUEST, @@ -574,7 +575,7 @@ _IPYTHON = _descriptor.ServiceDescriptor( ), _descriptor.MethodDescriptor( name='stop', - full_name='ipython.IPython.stop', + full_name='jupyter.JupyterKernel.stop', index=4, containing_service=None, input_type=_STOPREQUEST, @@ -582,8 +583,8 @@ _IPYTHON = _descriptor.ServiceDescriptor( serialized_options=None, ), ]) -_sym_db.RegisterServiceDescriptor(_IPYTHON) +_sym_db.RegisterServiceDescriptor(_JUPYTERKERNEL) -DESCRIPTOR.services_by_name['IPython'] = _IPYTHON +DESCRIPTOR.services_by_name['JupyterKernel'] = _JUPYTERKERNEL # @@protoc_insertion_point(module_scope) diff --git a/python/src/main/resources/grpc/python/ipython_pb2_grpc.py b/zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_pb2_grpc.py similarity index 60% rename from python/src/main/resources/grpc/python/ipython_pb2_grpc.py rename to zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_pb2_grpc.py index a590319..fe7ddeb 100644 --- a/python/src/main/resources/grpc/python/ipython_pb2_grpc.py +++ b/zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_pb2_grpc.py @@ -13,14 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. + # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! import grpc -import ipython_pb2 as ipython__pb2 +import kernel_pb2 as kernel__pb2 -class IPythonStub(object): - """The IPython service definition. +class JupyterKernelStub(object): + """The JupyterKernel service definition. """ def __init__(self, channel): @@ -30,34 +31,34 @@ class IPythonStub(object): channel: A grpc.Channel. """ self.execute = channel.unary_stream( - '/ipython.IPython/execute', - request_serializer=ipython__pb2.ExecuteRequest.SerializeToString, - response_deserializer=ipython__pb2.ExecuteResponse.FromString, + '/jupyter.JupyterKernel/execute', + request_serializer=kernel__pb2.ExecuteRequest.SerializeToString, + response_deserializer=kernel__pb2.ExecuteResponse.FromString, ) self.complete = channel.unary_unary( - '/ipython.IPython/complete', - request_serializer=ipython__pb2.CompletionRequest.SerializeToString, - response_deserializer=ipython__pb2.CompletionResponse.FromString, + '/jupyter.JupyterKernel/complete', + request_serializer=kernel__pb2.CompletionRequest.SerializeToString, + response_deserializer=kernel__pb2.CompletionResponse.FromString, ) self.cancel = channel.unary_unary( - '/ipython.IPython/cancel', - request_serializer=ipython__pb2.CancelRequest.SerializeToString, - response_deserializer=ipython__pb2.CancelResponse.FromString, + '/jupyter.JupyterKernel/cancel', + request_serializer=kernel__pb2.CancelRequest.SerializeToString, + response_deserializer=kernel__pb2.CancelResponse.FromString, ) self.status = channel.unary_unary( - '/ipython.IPython/status', - request_serializer=ipython__pb2.StatusRequest.SerializeToString, - response_deserializer=ipython__pb2.StatusResponse.FromString, + '/jupyter.JupyterKernel/status', + request_serializer=kernel__pb2.StatusRequest.SerializeToString, + response_deserializer=kernel__pb2.StatusResponse.FromString, ) self.stop = channel.unary_unary( - '/ipython.IPython/stop', - request_serializer=ipython__pb2.StopRequest.SerializeToString, - response_deserializer=ipython__pb2.StopResponse.FromString, + '/jupyter.JupyterKernel/stop', + request_serializer=kernel__pb2.StopRequest.SerializeToString, + response_deserializer=kernel__pb2.StopResponse.FromString, ) -class IPythonServicer(object): - """The IPython service definition. +class JupyterKernelServicer(object): + """The JupyterKernel service definition. """ def execute(self, request, context): @@ -82,48 +83,48 @@ class IPythonServicer(object): raise NotImplementedError('Method not implemented!') def status(self, request, context): - """Get ipython kernel status + """Get jupyter kernel status """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def stop(self, request, context): - # missing associated documentation comment in .proto file - pass + """Stop jupyter kernel + """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') -def add_IPythonServicer_to_server(servicer, server): +def add_JupyterKernelServicer_to_server(servicer, server): rpc_method_handlers = { 'execute': grpc.unary_stream_rpc_method_handler( servicer.execute, - request_deserializer=ipython__pb2.ExecuteRequest.FromString, - response_serializer=ipython__pb2.ExecuteResponse.SerializeToString, + request_deserializer=kernel__pb2.ExecuteRequest.FromString, + response_serializer=kernel__pb2.ExecuteResponse.SerializeToString, ), 'complete': grpc.unary_unary_rpc_method_handler( servicer.complete, - request_deserializer=ipython__pb2.CompletionRequest.FromString, - response_serializer=ipython__pb2.CompletionResponse.SerializeToString, + request_deserializer=kernel__pb2.CompletionRequest.FromString, + response_serializer=kernel__pb2.CompletionResponse.SerializeToString, ), 'cancel': grpc.unary_unary_rpc_method_handler( servicer.cancel, - request_deserializer=ipython__pb2.CancelRequest.FromString, - response_serializer=ipython__pb2.CancelResponse.SerializeToString, + request_deserializer=kernel__pb2.CancelRequest.FromString, + response_serializer=kernel__pb2.CancelResponse.SerializeToString, ), 'status': grpc.unary_unary_rpc_method_handler( servicer.status, - request_deserializer=ipython__pb2.StatusRequest.FromString, - response_serializer=ipython__pb2.StatusResponse.SerializeToString, + request_deserializer=kernel__pb2.StatusRequest.FromString, + response_serializer=kernel__pb2.StatusResponse.SerializeToString, ), 'stop': grpc.unary_unary_rpc_method_handler( servicer.stop, - request_deserializer=ipython__pb2.StopRequest.FromString, - response_serializer=ipython__pb2.StopResponse.SerializeToString, + request_deserializer=kernel__pb2.StopRequest.FromString, + response_serializer=kernel__pb2.StopResponse.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( - 'ipython.IPython', rpc_method_handlers) + 'jupyter.JupyterKernel', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) diff --git a/python/src/main/resources/grpc/python/ipython_server.py b/zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_server.py similarity index 79% rename from python/src/main/resources/grpc/python/ipython_server.py rename to zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_server.py index dbc3afd..5708c71 100644 --- a/python/src/main/resources/grpc/python/ipython_server.py +++ b/zeppelin-jupyter-adapter/src/main/resources/grpc/jupyter/kernel_server.py @@ -23,8 +23,8 @@ import time from concurrent import futures import grpc -import ipython_pb2 -import ipython_pb2_grpc +import kernel_pb2 +import kernel_pb2_grpc is_py2 = sys.version[0] == '2' if is_py2: @@ -33,11 +33,12 @@ else: import queue as queue -class IPython(ipython_pb2_grpc.IPythonServicer): +class KernelServer(kernel_pb2_grpc.JupyterKernelServicer): - def __init__(self, server): - self._status = ipython_pb2.STARTING + def __init__(self, server, kernel_name): + self._status = kernel_pb2.STARTING self._server = server + self._kernel_name = kernel_name # issue with execute_interactive and auto completion: https://github.com/jupyter/jupyter_client/issues/429 # in all case because ipython does not support run and auto completion at the same time: https://github.com/jupyter/notebook/issues/3763 # For now we will lock to ensure that there is no concurrent bug that can "hang" the kernel @@ -46,8 +47,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer): def start(self): print("starting...") sys.stdout.flush() - self._km, self._kc = jupyter_client.manager.start_new_kernel(kernel_name='python') - self._status = ipython_pb2.RUNNING + self._km, self._kc = jupyter_client.manager.start_new_kernel(kernel_name=self._kernel_name) + self._status = kernel_pb2.RUNNING def execute(self, request, context): print("execute code:\n") @@ -59,42 +60,42 @@ class IPython(ipython_pb2_grpc.IPythonServicer): msg_type = msg['header']['msg_type'] content = msg['content'] print("******************* CONTENT ******************") - outStatus, outType, output = ipython_pb2.SUCCESS, None, None + outStatus, outType, output = kernel_pb2.SUCCESS, None, None # prepare the reply if msg_type == 'stream': - outType = ipython_pb2.TEXT + outType = kernel_pb2.TEXT output = content['text'] elif msg_type in ('display_data', 'execute_result'): print(content['data']) # The if-else order matters, can not be changed. Because ipython may provide multiple output. # TEXT is the last resort type. if 'text/html' in content['data']: - outType = ipython_pb2.HTML + outType = kernel_pb2.HTML output = content['data']['text/html'] elif 'image/jpeg' in content['data']: - outType = ipython_pb2.JPEG + outType = kernel_pb2.JPEG output = content['data']['image/jpeg'] elif 'image/png' in content['data']: - outType = ipython_pb2.PNG + outType = kernel_pb2.PNG output = content['data']['image/png'] elif 'application/javascript' in content['data']: - outType = ipython_pb2.HTML + outType = kernel_pb2.HTML output = '<script> ' + content['data']['application/javascript'] + ' </script>\n' elif 'application/vnd.holoviews_load.v0+json' in content['data']: - outType = ipython_pb2.HTML + outType = kernel_pb2.HTML output = '<script> ' + content['data']['application/vnd.holoviews_load.v0+json'] + ' </script>\n' elif 'text/plain' in content['data']: - outType = ipython_pb2.TEXT + outType = kernel_pb2.TEXT output = content['data']['text/plain'] elif msg_type == 'error': - outStatus = ipython_pb2.ERROR - outType = ipython_pb2.TEXT + outStatus = kernel_pb2.ERROR + outType = kernel_pb2.TEXT output = '\n'.join(content['traceback']) # send reply if we supported the output type if outType is not None: stream_reply_queue.put( - ipython_pb2.ExecuteResponse(status=outStatus, + kernel_pb2.ExecuteResponse(status=outStatus, type=outType, output=output)) def execute_worker(): @@ -121,8 +122,8 @@ class IPython(ipython_pb2_grpc.IPythonServicer): # if kernel is not alive or thread is still alive, it means that we face an issue. if not self.isKernelAlive() or t.is_alive(): - yield ipython_pb2.ExecuteResponse(status=ipython_pb2.ERROR, - type=ipython_pb2.TEXT, + yield kernel_pb2.ExecuteResponse(status=kernel_pb2.ERROR, + type=kernel_pb2.TEXT, output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.") if payload_reply: result = [] @@ -130,21 +131,21 @@ class IPython(ipython_pb2_grpc.IPythonServicer): if payload['data']['text/plain']: result.append(payload['data']['text/plain']) if result: - yield ipython_pb2.ExecuteResponse(status=ipython_pb2.SUCCESS, - type=ipython_pb2.TEXT, + yield kernel_pb2.ExecuteResponse(status=kernel_pb2.SUCCESS, + type=kernel_pb2.TEXT, output='\n'.join(result)) def cancel(self, request, context): self._km.interrupt_kernel() - return ipython_pb2.CancelResponse() + return kernel_pb2.CancelResponse() def complete(self, request, context): with self._lock: reply = self._kc.complete(request.code, request.cursor, reply=True, timeout=None) - return ipython_pb2.CompletionResponse(matches=reply['content']['matches']) + return kernel_pb2.CompletionResponse(matches=reply['content']['matches']) def status(self, request, context): - return ipython_pb2.StatusResponse(status = self._status) + return kernel_pb2.StatusResponse(status = self._status) def isKernelAlive(self): return self._km.is_alive() @@ -154,18 +155,18 @@ class IPython(ipython_pb2_grpc.IPythonServicer): def stop(self, request, context): self.terminate() - return ipython_pb2.StopResponse() + return kernel_pb2.StopResponse() -def serve(port): +def serve(kernel_name, port): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - ipython = IPython(server) - ipython_pb2_grpc.add_IPythonServicer_to_server(ipython, server) + kernel = KernelServer(server, kernel_name) + kernel_pb2_grpc.add_JupyterKernelServicer_to_server(kernel, server) server.add_insecure_port('[::]:' + port) server.start() - ipython.start() + kernel.start() try: - while ipython.isKernelAlive(): + while kernel.isKernelAlive(): time.sleep(5) except KeyboardInterrupt: print("interrupted") @@ -173,8 +174,8 @@ def serve(port): print("shutdown") # we let 2 sc for all request to be complete server.stop(2) - ipython.terminate() + kernel.terminate() os._exit(0) if __name__ == '__main__': - serve(sys.argv[1]) + serve(sys.argv[1], sys.argv[2])