This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 7a472fe [ZEPPELIN-4496] Add support irkernel 7a472fe is described below commit 7a472fe0fd63493e933d0549a808e21db38d68d8 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri Dec 20 11:20:54 2019 +0800 [ZEPPELIN-4496] Add support irkernel ### What is this PR for? This PR add support for jupyter-interpreter, r interpreter and spark interpreter. ir kernel support in jupyter-interpreter is straightforward, nothing needs to be done. In r interpreter, we use spark-core to set up the communication channel between JVM and R process. * SparkIRInterpreter = IRInterpreter + SparkContext/SparkSession * IRInterpreter = JupyterKernelInterpreter + ZeppelinContext ### What type of PR is it? [ Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4496 ### How should this be tested? * CI pass ### Screenshots (if appropriate)    ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3575 from zjffdu/ZEPPELIN-4496 and squashes the following commits: 9f30e72b3 [Jeff Zhang] [ZEPPELIN-4496] Add irkernel support --- .travis.yml | 20 +-- rlang/pom.xml | 48 ++++++- .../java/org/apache/zeppelin/r/IRInterpreter.java | 137 +++++++++++++++++++ .../java/org/apache/zeppelin/r/SparkRUtils.java | 51 +++++++ rlang/src/main/resources/R/zeppelin_isparkr.R | 116 ++++++++++++++++ rlang/src/main/resources/interpreter-setting.json | 7 + .../org/apache/zeppelin/r/IRInterpreterTest.java | 46 +++++++ spark/interpreter/pom.xml | 86 +++++++++++- .../apache/zeppelin/spark/SparkIRInterpreter.java | 106 +++++++++++++++ .../apache/zeppelin/spark/SparkSqlInterpreter.java | 1 - .../src/main/resources/interpreter-setting.json | 12 ++ .../zeppelin/spark/SparkIRInterpreterTest.java | 150 +++++++++++++++++++++ .../src/test/resources/log4j.properties | 2 + spark/spark-dependencies/pom.xml | 45 ------- testing/install_external_dependencies.sh | 36 ++--- zeppelin-interpreter-integration/pom.xml | 4 + zeppelin-interpreter-parent/pom.xml | 3 - .../src/test/resources/log4j.properties | 3 +- zeppelin-jupyter-interpreter/pom.xml | 14 ++ .../zeppelin/jupyter/JupyterKernelClient.java | 19 ++- .../zeppelin/jupyter/JupyterKernelInterpreter.java | 13 +- .../org/apache/zeppelin/jupyter/IRKernelTest.java | 121 +++++++++++++++++ .../src/test/resources/log4j.properties | 16 +-- 23 files changed, 955 insertions(+), 101 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6956c4a..d2979a4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -86,19 +86,19 @@ matrix: - sudo: required jdk: "openjdk8" dist: xenial - env: PYTHON="3" SPARKR="true" PROFILE="-Pspark-2.2 -Phelium-dev -Pexamples -Pspark-scala-2.11" BUILD_FLAG="install -Pbuild-distr -DskipRat -DskipTests" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/JdbcIntegrationTest.java,**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false" + env: PYTHON="3" R="true" PROFILE="-Pspark-2.2 -Phelium-dev -Pexamples -Pspark-scala-2.11" BUILD_FLAG="install -Pbuild-distr -DskipRat -DskipTests" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/JdbcIntegrationTest.java,**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false" # Test selenium with spark module for spark 2.3 - jdk: "openjdk8" dist: xenial addons: firefox: "31.0" - env: BUILD_PLUGINS="true" CI="true" PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.3.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.3 -Phadoop2 -Phelium-dev -Pexamples -Pintegration -Pspark-scala-2.11" BUILD_FLAG="clean install -DskipTests -DskipRat -pl ${INTERPRETERS}" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-integration -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" CI="true" PYTHON="2" R="true" SCALA_VER="2.11" SPARK_VER="2.3.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.3 -Phadoop2 -Phelium-dev -Pexamples -Pintegration -Pspark-scala-2.11" BUILD_FLAG="clean install -DskipTests -DskipRat -pl ${INTERPRETERS}" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-integration -DfailIfNoTests=false" # Test interpreter modules - jdk: "openjdk8" dist: xenial - env: PYTHON="3" SPARKR="true" SCALA_VER="2.10" TENSORFLOW="1.13.1" PROFILE="-Pscala-2.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-api,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS="" + env: PYTHON="3" R="true" SCALA_VER="2.10" TENSORFLOW="1.13.1" PROFILE="-Pscala-2.10" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-api,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS="" # Run Spark integration test and unit test separately for each spark version @@ -106,43 +106,43 @@ matrix: - sudo: required jdk: "openjdk8" dist: xenial - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.11" PROFILE="-Pspark-2.4 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.kotlin.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.11" PROFILE="-Pspark-2.4 -Pspark-scala-2.11 -Phadoop2 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.kotlin.* -DfailIfNoTests=false" # ZeppelinSparkClusterTest24, SparkIntegrationTest24, JdbcIntegrationTest, Unit test of Spark 2.4 (Scala-2.12) - sudo: required jdk: "openjdk8" dist: xenial - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Pspark-2.4 -Pspark-scala-2.12 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.kotlin.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Pspark-2.4 -Pspark-scala-2.12 -Phadoop2 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.*,org.apache.zeppelin.kotlin.* -DfailIfNoTests=false" # ZeppelinSparkClusterTest23, SparkIntegrationTest23, Unit test of Spark 2.3 (Scala-2.11) and Unit test PythonInterpreter under python2 - sudo: required jdk: "openjdk8" dist: xenial - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.11" PROFILE="-Pspark-2.3 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest23,SparkIntegrationTest23,org.apache.zeppelin.spark.*,apache.zeppelin.python.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.11" PROFILE="-Pspark-2.3 -Pspark-scala-2.11 -Phadoop2 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest23,SparkIntegrationTest23,org.apache.zeppelin.spark.*,apache.zeppelin.python.* -DfailIfNoTests=false" # ZeppelinSparkClusterTest22, SparkIntegrationTest22, Unit test of Spark 2.2 (Scala-2.10) and Unit test PythonInterpreter under python3 - sudo: required jdk: "openjdk8" dist: xenial - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.2 -Pspark-scala-2.10 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest22,SparkIntegrationTest22,org.apache.zeppelin.spark.*,apache.zeppelin.python.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.2 -Pspark-scala-2.10 -Phadoop2 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest22,SparkIntegrationTest22,org.apache.zeppelin.spark.*,apache.zeppelin.python.* -DfailIfNoTests=false" # ZeppelinSparkClusterTest21, SparkIntegrationTest21, Unit test of Spark 2.1 (Scala-2.10) - sudo: required jdk: "openjdk8" dist: xenial - env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.1 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest21,SparkIntegrationTest21,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.1 -Phadoop2 -Pspark-scala-2.10 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest21,SparkIntegrationTest21,org.apache.zeppelin.spark.* -DfailIfNoTests=false" # ZeppelinSparkClusterTest20, SparkIntegrationTest20, Unit test of Spark 2.0 (Scala-2.10), Use python 3.5 because spark 2.0 doesn't support python 3.6 + - sudo: required jdk: "openjdk8" dist: xenial - env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.10" PROFILE="-Pspark-2.0 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest20,SparkIntegrationTest20,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.10" PROFILE="-Pspark-2.0 -Phadoop2 -Pspark-scala-2.10 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest20,SparkIntegrationTest20,org.apache.zeppelin.spark.* -DfailIfNoTests=false" # ZeppelinSparkClusterTest16, SparkIntegrationTest16, Unit test of Spark 1.6 (Scala-2.10) - sudo: required jdk: "openjdk8" dist: xenial - env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.10" PROFILE="-Pspark-1.6 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest16,SparkIntegrationTest16,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.10" PROFILE="-Pspark-1.6 -Phadoop2 -Pspark-scala-2.10 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest16,SparkIntegrationTest16,org.apache.zeppelin.spark.* -DfailIfNoTests=false" # Test python/pyspark with python 2, livy 0.5 - sudo: required diff --git a/rlang/pom.xml b/rlang/pom.xml index 5f68e19..6284fcb 100644 --- a/rlang/pom.xml +++ b/rlang/pom.xml @@ -38,6 +38,7 @@ <interpreter.name>r</interpreter.name> <jsoup.version>1.12.1</jsoup.version> <spark.version>2.4.3</spark.version> + <grpc.version>1.15.0</grpc.version> <spark.archive>spark-${spark.version}</spark.archive> <spark.src.download.url> @@ -52,6 +53,38 @@ <dependencies> <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-jupyter-interpreter</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-jupyter-interpreter</artifactId> + <scope>test</scope> + <classifier>tests</classifier> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + <version>${grpc.version}</version> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + + <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> </dependency> @@ -158,7 +191,7 @@ <exclude>org.apache.zeppelin:zeppelin-interpreter-api</exclude> </excludes> </artifactSet> - <outputFile>${project.build.directory}/../../interpreter/rlang/${interpreter.jar.name}-${project.version}.jar</outputFile> + <outputFile>${project.build.directory}/../../interpreter/r/${interpreter.jar.name}-${project.version}.jar</outputFile> </configuration> <executions> <execution> @@ -183,6 +216,19 @@ </configuration> </plugin> + <!-- publish test jar as well so that spark module can use it --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>3.0.2</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java new file mode 100644 index 0000000..f04b091 --- /dev/null +++ b/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.r; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse; +import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus; +import org.apache.zeppelin.jupyter.JupyterKernelInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +/** + * R Interpreter which use the IRKernel (https://github.com/IRkernel/IRkernel), + * Besides that it use Spark to setup communication channel between JVM and R process, so that user + * can use ZeppelinContext. + */ +public class IRInterpreter extends JupyterKernelInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(IRInterpreter.class); + + private SparkRBackend sparkRBackend; + + public IRInterpreter(Properties properties) { + super(properties); + } + + /** + * RInterpreter just use spark-core for the communication between R process and jvm process. + * SparkContext is not created in this RInterpreter. + * Sub class can override this, e.g. SparkRInterpreter + * @return + */ + protected boolean isSparkSupported() { + return false; + } + + /** + * The spark version specified in pom.xml + * Sub class can override this, e.g. SparkRInterpreter + * @return + */ + protected int sparkVersion() { + return 20403; + } + + /** + * Spark 2.4.3 need secret for socket communication between R process and jvm process. + * Sub class can override this, e.g. SparkRInterpreter + * @return + */ + protected boolean isSecretSupported() { + return true; + } + + @Override + public void open() throws InterpreterException { + super.open(); + + this.sparkRBackend = SparkRBackend.get(); + // Share the same SparkRBackend across sessions + synchronized (sparkRBackend) { + if (!sparkRBackend.isStarted()) { + try { + sparkRBackend.init(isSecretSupported()); + } catch (Exception e) { + throw new InterpreterException("Fail to init SparkRBackend", e); + } + sparkRBackend.start(); + } + } + + try { + initIRKernel(); + } catch (IOException e) { + throw new InterpreterException("Fail to init IR Kernel:\n" + + ExceptionUtils.getStackTrace(e), e); + } + } + + /** + * Init IRKernel by execute R script zeppelin-isparkr.R + * @throws IOException + * @throws InterpreterException + */ + protected void initIRKernel() throws IOException, InterpreterException { + String timeout = getProperty("spark.r.backendConnectionTimeout", "6000"); + InputStream input = + getClass().getClassLoader().getResourceAsStream("R/zeppelin_isparkr.R"); + String code = IOUtils.toString(input) + .replace("${Port}", sparkRBackend.port() + "") + .replace("${version}", sparkVersion() + "") + .replace("${libPath}", "\"" + SparkRUtils.getSparkRLib(isSparkSupported()) + "\"") + .replace("${timeout}", timeout) + .replace("${isSparkSupported}", "\"" + isSparkSupported() + "\"") + .replace("${authSecret}", "\"" + sparkRBackend.socketSecret() + "\""); + LOGGER.info("Init IRKernel via script:\n" + code); + ExecuteResponse response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder() + .setCode(code).build()); + if (response.getStatus() != ExecuteStatus.SUCCESS) { + throw new IOException("Fail to setup JVMGateway\n" + response.getOutput()); + } + } + + @Override + public String getKernelName() { + return "ir"; + } + + @Override + public BaseZeppelinContext buildZeppelinContext() { + return new RZeppelinContext(getInterpreterGroup().getInterpreterHookRegistry(), + Integer.parseInt(getProperty("zeppelin.r.maxResult", "1000"))); + } +} diff --git a/rlang/src/main/java/org/apache/zeppelin/r/SparkRUtils.java b/rlang/src/main/java/org/apache/zeppelin/r/SparkRUtils.java new file mode 100644 index 0000000..532ff25 --- /dev/null +++ b/rlang/src/main/java/org/apache/zeppelin/r/SparkRUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.r; + +import org.apache.zeppelin.interpreter.InterpreterException; + +import java.io.File; + +public class SparkRUtils { + + public static String getSparkRLib(boolean isSparkSupported) throws InterpreterException { + String sparkRLibPath; + + if (System.getenv("SPARK_HOME") != null) { + // local or yarn-client mode when SPARK_HOME is specified + sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib"; + } else if (System.getenv("ZEPPELIN_HOME") != null){ + // embedded mode when SPARK_HOME is not specified or for native R support + String interpreter = "r"; + if (isSparkSupported) { + interpreter = "spark"; + } + sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/" + interpreter + "/R/lib"; + // workaround to make sparkr work without SPARK_HOME + System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/" + interpreter); + } else { + // yarn-cluster mode + sparkRLibPath = "sparkr"; + } + if (!new File(sparkRLibPath).exists()) { + throw new InterpreterException(String.format("sparkRLib '%s' doesn't exist", sparkRLibPath)); + } + + return sparkRLibPath; + } +} diff --git a/rlang/src/main/resources/R/zeppelin_isparkr.R b/rlang/src/main/resources/R/zeppelin_isparkr.R new file mode 100644 index 0000000..f279ee1 --- /dev/null +++ b/rlang/src/main/resources/R/zeppelin_isparkr.R @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +port <- ${Port} +libPath <- ${libPath} +version <- ${version} +timeout <- ${timeout} +isSparkSupported <- ${isSparkSupported} +authSecret <- ${authSecret} + +print(paste("Port ", toString(port))) +print(paste("LibPath ", libPath)) + +.libPaths(c(file.path(libPath), .libPaths())) +library(SparkR) + +if (is.null(authSecret) || authSecret == '') { + SparkR:::connectBackend("localhost", port, timeout) +} else { + SparkR:::connectBackend("localhost", port, timeout, authSecret) +} + +# scStartTime is needed by R/pkg/R/sparkR.R +assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv) + +if (isSparkSupported == "true") { + # setup spark env + assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv) + assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) + if (version >= 20000) { + assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv) + assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) + assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv) + } + assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) + assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) + assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv) +} else { + assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.r.RInterpreter", "getRZeppelinContext"), envir = .GlobalEnv) +} + +z.put <- function(name, object) { + SparkR:::callJMethod(.zeppelinContext, "put", name, object) +} + +z.get <- function(name) { + SparkR:::callJMethod(.zeppelinContext, "get", name) +} + +z.getAsDataFrame <- function(name) { + stringValue <- z.get(name) + read.table(text=stringValue, header=TRUE, sep="\t") +} + +z.angular <- function(name, noteId=NULL, paragraphId=NULL) { + SparkR:::callJMethod(.zeppelinContext, "angular", name, noteId, paragraphId) +} + +z.angularBind <- function(name, value, noteId=NULL, paragraphId=NULL) { + SparkR:::callJMethod(.zeppelinContext, "angularBind", name, value, noteId, paragraphId) +} + +z.textbox <- function(name, value) { + SparkR:::callJMethod(.zeppelinContext, "textbox", name, value) +} + +z.noteTextbox <- function(name, value) { + SparkR:::callJMethod(.zeppelinContext, "noteTextbox", name, value) +} + +z.password <- function(name) { + SparkR:::callJMethod(.zeppelinContext, "password", name) +} + +z.notePassword <- function(name) { + SparkR:::callJMethod(.zeppelinContext, "notePassword", name) +} + +z.run <- function(paragraphId) { + SparkR:::callJMethod(.zeppelinContext, "run", paragraphId) +} + +z.runNote <- function(noteId) { + SparkR:::callJMethod(.zeppelinContext, "runNote", noteId) +} + +z.runAll <- function() { + SparkR:::callJMethod(.zeppelinContext, "runAll") +} + +z.angular <- function(name) { + SparkR:::callJMethod(.zeppelinContext, "angular", name) +} + +z.angularBind <- function(name, value) { + SparkR:::callJMethod(.zeppelinContext, "angularBind", name, value) +} + +z.angularUnbind <- function(name, value) { + SparkR:::callJMethod(.zeppelinContext, "angularUnbind", name) +} \ No newline at end of file diff --git a/rlang/src/main/resources/interpreter-setting.json b/rlang/src/main/resources/interpreter-setting.json index aab4306..47697bd 100644 --- a/rlang/src/main/resources/interpreter-setting.json +++ b/rlang/src/main/resources/interpreter-setting.json @@ -33,5 +33,12 @@ "type": "textarea" } } + }, + { + "group": "r", + "name": "ir", + "className": "org.apache.zeppelin.r.IRInterpreter", + "properties": { + } } ] diff --git a/rlang/src/test/java/org/apache/zeppelin/r/IRInterpreterTest.java b/rlang/src/test/java/org/apache/zeppelin/r/IRInterpreterTest.java new file mode 100644 index 0000000..8305281 --- /dev/null +++ b/rlang/src/test/java/org/apache/zeppelin/r/IRInterpreterTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.r; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.jupyter.IRKernelTest; + +import java.util.HashMap; +import java.util.Properties; + + +public class IRInterpreterTest extends IRKernelTest { + + @Override + protected Interpreter createInterpreter(Properties properties) { + return new IRInterpreter(properties); + } + + @Override + protected InterpreterContext getInterpreterContext() { + InterpreterContext context = InterpreterContext.builder() + .setNoteId("note_1") + .setParagraphId("paragraph_1") + .setInterpreterOut(new InterpreterOutput(null)) + .setLocalProperties(new HashMap<>()) + .build(); + return context; + } +} diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index 703e2a4..86a5f75 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -120,6 +120,18 @@ </dependency> <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-jupyter-interpreter</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>net.sf.py4j</groupId> + <artifactId>py4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>${project.groupId}</groupId> <artifactId>zeppelin-python</artifactId> <version>${project.version}</version> @@ -134,6 +146,34 @@ </dependency> <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>r</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-jupyter-interpreter</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>net.sf.py4j</groupId> + <artifactId>py4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>${project.groupId}</groupId> <artifactId>r</artifactId> <version>${project.version}</version> @@ -353,6 +393,51 @@ </executions> </plugin> + <!-- include sparkr by default --> + <plugin> + <groupId>com.googlecode.maven-download-plugin</groupId> + <artifactId>download-maven-plugin</artifactId> + <executions> + <execution> + <id>download-sparkr-files</id> + <phase>validate</phase> + <goals> + <goal>wget</goal> + </goals> + <configuration> + <readTimeOut>60000</readTimeOut> + <retries>5</retries> + <url>${spark.bin.download.url}</url> + <unpack>true</unpack> + <outputDirectory>${project.build.directory}</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.7</version> + <executions> + <execution> + <id>copy-sparkr-files</id> + <phase>generate-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../../interpreter/spark/R/lib</outputDirectory> + <resources> + <resource> + <directory> + ${project.build.directory}/spark-${spark.version}-bin-without-hadoop/R/lib + </directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> @@ -366,7 +451,6 @@ <reuseForks>false</reuseForks> <argLine>-Xmx3072m -XX:MaxPermSize=256m</argLine> <excludes> - <exclude>**/SparkRInterpreterTest.java</exclude> <exclude>${pyspark.test.exclude}</exclude> <exclude>${tests.to.exclude}</exclude> </excludes> diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java new file mode 100644 index 0000000..004ce98 --- /dev/null +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkIRInterpreter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.r.IRInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * SparkR Interpreter which uses irkernel underneath. + */ +public class SparkIRInterpreter extends IRInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkRInterpreter.class); + + private SparkInterpreter sparkInterpreter; + private SparkVersion sparkVersion; + private boolean isSpark2; + private SparkContext sc; + private JavaSparkContext jsc; + + public SparkIRInterpreter(Properties properties) { + super(properties); + } + + protected boolean isSparkSupported() { + return true; + } + + protected int sparkVersion() { + return this.sparkVersion.toNumber(); + } + + protected boolean isSecretSupported() { + return this.sparkVersion.isSecretSocketSupported(); + } + + public void open() throws InterpreterException { + this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class); + this.sc = sparkInterpreter.getSparkContext(); + this.jsc = sparkInterpreter.getJavaSparkContext(); + this.sparkVersion = new SparkVersion(sc.version()); + this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0); + + ZeppelinRContext.setSparkContext(sc); + ZeppelinRContext.setJavaSparkContext(jsc); + if (isSpark2) { + ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); + } + ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); + ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext()); + super.open(); + } + + @Override + public InterpreterResult internalInterpret(String lines, InterpreterContext context) throws InterpreterException { + Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), + context, properties); + String jobGroup = Utils.buildJobGroupId(context); + String jobDesc = Utils.buildJobDesc(context); + sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false); + String setJobGroup = ""; + // assign setJobGroup to dummy__, otherwise it would print NULL for this statement + if (isSpark2) { + setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup + + "\", \" +" + jobDesc + "\", TRUE)"; + } else { + setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup + + "\", \"" + jobDesc + "\", TRUE)"; + } + lines = setJobGroup + "\n" + lines; + if (sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) { + // setLocalProperty is only available from spark 2.3.0 + String setPoolStmt = "setLocalProperty('spark.scheduler.pool', NULL)"; + if (context.getLocalProperties().containsKey("pool")) { + setPoolStmt = "setLocalProperty('spark.scheduler.pool', '" + + context.getLocalProperties().get("pool") + "')"; + } + lines = setPoolStmt + "\n" + lines; + } + return super.internalInterpret(lines, context); + } +} diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index f6372dd..fefcee5 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -22,7 +22,6 @@ import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.AbstractInterpreter; import org.apache.zeppelin.interpreter.BaseZeppelinContext; -import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json index e4554dd..7739221 100644 --- a/spark/interpreter/src/main/resources/interpreter-setting.json +++ b/spark/interpreter/src/main/resources/interpreter-setting.json @@ -246,6 +246,18 @@ }, { "group": "spark", + "name": "ir", + "className": "org.apache.zeppelin.spark.SparkIRInterpreter", + "properties": { + }, + "editor": { + "language": "r", + "editOnDblClick": false, + "completionSupport": true + } + }, + { + "group": "spark", "name": "kotlin", "className": "org.apache.zeppelin.spark.KotlinSparkInterpreter", "properties": { diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkIRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkIRInterpreterTest.java new file mode 100644 index 0000000..f5e3780 --- /dev/null +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkIRInterpreterTest.java @@ -0,0 +1,150 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; +import org.apache.zeppelin.r.IRInterpreterTest; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class SparkIRInterpreterTest extends IRInterpreterTest { + + private RemoteInterpreterEventClient mockRemoteIntpEventClient = mock(RemoteInterpreterEventClient.class); + + @Override + protected Interpreter createInterpreter(Properties properties) { + return new SparkIRInterpreter(properties); + } + + @Before + public void setUp() throws InterpreterException { + Properties properties = new Properties(); + properties.setProperty("spark.master", "local"); + properties.setProperty("spark.app.name", "test"); + properties.setProperty("zeppelin.spark.maxResult", "100"); + properties.setProperty("spark.r.backendConnectionTimeout", "10"); + properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); + + InterpreterContext context = getInterpreterContext(); + InterpreterContext.set(context); + interpreter = createInterpreter(properties); + + InterpreterGroup interpreterGroup = new InterpreterGroup(); + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), "session_1"); + interpreter.setInterpreterGroup(interpreterGroup); + + SparkInterpreter sparkInterpreter = new SparkInterpreter(properties); + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkInterpreter), "session_1"); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + + interpreter.open(); + } + + + @Test + public void testSparkRInterpreter() throws InterpreterException, InterruptedException, IOException { + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = interpreter.interpret("1+1", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertTrue(interpreterResultMessages.get(0).getData().contains("2")); + + context = getInterpreterContext(); + result = interpreter.interpret("sparkR.version()", context); + if (result.code() == InterpreterResult.Code.ERROR) { + // Spark 1.x has no api for Spark.version() + // spark 1.x + context = getInterpreterContext(); + result = interpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context); + interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(interpreterResultMessages.get(0).getData().contains(">eruptions</th>")); + // spark job url is sent + verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class)); + } else { + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.toInterpreterResultMessage(); + context = getInterpreterContext(); + result = interpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context); + interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(interpreterResultMessages.get(0).getData().contains(">eruptions</th>")); + // spark job url is sent + verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class)); + + // cancel + final InterpreterContext context2 = getInterpreterContext(); + Thread thread = new Thread() { + @Override + public void run() { + try { + InterpreterResult result = interpreter.interpret("ldf <- dapplyCollect(\n" + + " df,\n" + + " function(x) {\n" + + " Sys.sleep(3)\n" + + " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" + + " })\n" + + "head(ldf, 3)", context2); + assertTrue(result.message().get(0).getData().contains("cancelled")); + } catch (InterpreterException e) { + fail("Should not throw InterpreterException"); + } + } + }; + thread.setName("Cancel-Thread"); + thread.start(); + Thread.sleep(1000); + interpreter.cancel(context2); + } + } + + @Override + protected InterpreterContext getInterpreterContext() { + InterpreterContext context = InterpreterContext.builder() + .setNoteId("note_1") + .setParagraphId("paragraph_1") + .setInterpreterOut(new InterpreterOutput(null)) + .setLocalProperties(new HashMap<>()) + .setIntpEventClient(mockRemoteIntpEventClient) + .build(); + return context; + } +} diff --git a/spark/interpreter/src/test/resources/log4j.properties b/spark/interpreter/src/test/resources/log4j.properties index 8c89e1e..408572f 100644 --- a/spark/interpreter/src/test/resources/log4j.properties +++ b/spark/interpreter/src/test/resources/log4j.properties @@ -44,6 +44,8 @@ log4j.logger.DataNucleus.Datastore=ERROR log4j.logger.org.hibernate.type=ALL log4j.logger.org.apache.zeppelin.interpreter=WARN +#log4j.logger.org.apache.zeppelin.interpreter.util=DEBUG +#log4j.logger.org.apache.zeppelin.jupyter=DEBUG #log4j.logger.org.apache.zeppelin.spark=DEBUG #log4j.logger.org.apache.zeppelin.python=DEBUG log4j.logger.org.apache.spark.repl.Main=WARN diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml index 438b474..8c176d8 100644 --- a/spark/spark-dependencies/pom.xml +++ b/spark/spark-dependencies/pom.xml @@ -298,51 +298,6 @@ </execution> </executions> </plugin> - - <!-- include sparkr by default --> - <plugin> - <groupId>com.googlecode.maven-download-plugin</groupId> - <artifactId>download-maven-plugin</artifactId> - <executions> - <execution> - <id>download-sparkr-files</id> - <phase>validate</phase> - <goals> - <goal>wget</goal> - </goals> - <configuration> - <readTimeOut>60000</readTimeOut> - <retries>5</retries> - <url>${spark.bin.download.url}</url> - <unpack>true</unpack> - <outputDirectory>${project.build.directory}</outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.7</version> - <executions> - <execution> - <id>copy-sparkr-files</id> - <phase>generate-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/../../../interpreter/spark/R/lib</outputDirectory> - <resources> - <resource> - <directory> - ${project.build.directory}/spark-${spark.version}-bin-without-hadoop/R/lib - </directory> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> diff --git a/testing/install_external_dependencies.sh b/testing/install_external_dependencies.sh index 944efaa..8102edc 100755 --- a/testing/install_external_dependencies.sh +++ b/testing/install_external_dependencies.sh @@ -20,20 +20,6 @@ set -ev touch ~/.environ -# Install R dependencies if SPARKR is true -if [[ "${SPARKR}" = "true" ]] ; then - echo "R_LIBS=~/R" > ~/.Renviron - echo "export R_LIBS=~/R" >> ~/.environ - source ~/.environ - if [[ ! -d "$HOME/R/knitr" ]] ; then - mkdir -p ~/R - R -e "install.packages('evaluate', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 - R -e "install.packages('base64enc', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 - R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 - R -e "install.packages('ggplot2', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 - fi -fi - # Install Python dependencies for Python specific tests if [[ -n "$PYTHON" ]] ; then wget https://repo.continuum.io/miniconda/Miniconda${PYTHON}-4.6.14-Linux-x86_64.sh -O miniconda.sh @@ -48,7 +34,7 @@ if [[ -n "$PYTHON" ]] ; then conda info -a conda config --add channels conda-forge - if [[ $PYTHON == "2" ]] ; then + if [[ "$PYTHON" == "2" ]] ; then pip install -q numpy==1.14.5 pandas==0.21.1 matplotlib==2.1.1 scipy==1.2.1 grpcio==1.19.0 bkzep==0.6.1 hvplot==0.5.2 \ protobuf==3.7.0 pandasql==0.7.3 ipython==5.8.0 ipykernel==4.10.0 bokeh==1.3.4 else @@ -63,3 +49,23 @@ if [[ -n "$PYTHON" ]] ; then pip install tensorflow==${TENSORFLOW} fi fi + +# Install R dependencies if R is true +if [[ "$R" == "true" ]] ; then + echo "R_LIBS=~/R" > ~/.Renviron + echo "export R_LIBS=~/R" >> ~/.environ + source ~/.environ + + sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 + sudo add-apt-repository 'deb [arch=amd64,i386] https://cran.rstudio.com/bin/linux/ubuntu xenial/' + sudo apt-get update + sudo apt-get install r-base + + mkdir -p ~/R + R -e "install.packages('evaluate', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 + R -e "install.packages('base64enc', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 + R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 + R -e "install.packages('ggplot2', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1 + R -e "install.packages('IRkernel', repos = 'http://cran.us.r-project.org', lib='~/R');IRkernel::installspec()" > /dev/null 2>&1 + +fi diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml index 9d898e6..fc0dc85 100644 --- a/zeppelin-interpreter-integration/pom.xml +++ b/zeppelin-interpreter-integration/pom.xml @@ -507,6 +507,10 @@ <groupId>org.apache.zeppelin</groupId> <artifactId>r</artifactId> </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> </exclusions> </dependency> </dependencies> diff --git a/zeppelin-interpreter-parent/pom.xml b/zeppelin-interpreter-parent/pom.xml index 98678a2..74ba4b9 100644 --- a/zeppelin-interpreter-parent/pom.xml +++ b/zeppelin-interpreter-parent/pom.xml @@ -84,9 +84,6 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-install-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> </plugin> </plugins> diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-interpreter/src/test/resources/log4j.properties index 0e4935c..b724845 100644 --- a/zeppelin-interpreter/src/test/resources/log4j.properties +++ b/zeppelin-interpreter/src/test/resources/log4j.properties @@ -27,5 +27,4 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n # Root logger option log4j.rootLogger=INFO, stdout -log4j.logger.org.apache.zeppelin.interpreter=DEBUG -log4j.logger.org.apache.zeppelin.scheduler=DEBUG +#log4j.logger.org.apache.zeppelin.interpreter=DEBUG diff --git a/zeppelin-jupyter-interpreter/pom.xml b/zeppelin-jupyter-interpreter/pom.xml index a924717..e179490 100644 --- a/zeppelin-jupyter-interpreter/pom.xml +++ b/zeppelin-jupyter-interpreter/pom.xml @@ -193,6 +193,20 @@ <skip>false</skip> </configuration> </plugin> + + <!-- publish test jar as well so that spark module can use it --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>3.0.2</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java index 2a65b0c..8b64a9b 100644 --- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java +++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java @@ -194,14 +194,21 @@ public class JupyterKernelClient { responseBuilder.setStatus(ExecuteStatus.SUCCESS); Iterator<ExecuteResponse> iter = blockingStub.execute(request); StringBuilder outputBuilder = new StringBuilder(); - while (iter.hasNext()) { - ExecuteResponse nextResponse = iter.next(); - if (nextResponse.getStatus() == ExecuteStatus.ERROR) { - responseBuilder.setStatus(ExecuteStatus.ERROR); + try { + // iter.hasNext may throw exception, so use try catch outside. + while (iter.hasNext()) { + ExecuteResponse nextResponse = iter.next(); + if (nextResponse.getStatus() == ExecuteStatus.ERROR) { + responseBuilder.setStatus(ExecuteStatus.ERROR); + } + outputBuilder.append(nextResponse.getOutput()); } - outputBuilder.append(nextResponse.getOutput()); + responseBuilder.setOutput(outputBuilder.toString()); + } catch (Exception e) { + responseBuilder.setStatus(ExecuteStatus.ERROR); + responseBuilder.setOutput(outputBuilder.toString()); } - responseBuilder.setOutput(outputBuilder.toString()); + return responseBuilder.build(); } diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java index c38983d..3a15337 100644 --- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java +++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java @@ -67,8 +67,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter { private JupyterKernelProcessLauncher jupyterKernelProcessLauncher; protected JupyterKernelClient jupyterKernelClient; - - protected BaseZeppelinContext zeppelinContext; + protected BaseZeppelinContext z; private String kernel; // working directory of jupyter kernel @@ -120,7 +119,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter { } kernelLaunchTimeout = Integer.parseInt( getProperty("zeppelin.jupyter.kernel.launch.timeout", "30000")); - this.zeppelinContext = buildZeppelinContext(); + this.z = buildZeppelinContext(); int kernelPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); int message_size = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size", 32 * 1024 * 1024 + "")); @@ -240,9 +239,9 @@ public class JupyterKernelInterpreter extends AbstractInterpreter { @Override public InterpreterResult internalInterpret(String st, InterpreterContext context) throws InterpreterException { - zeppelinContext.setGui(context.getGui()); - zeppelinContext.setNoteGui(context.getNoteGui()); - zeppelinContext.setInterpreterContext(context); + z.setGui(context.getGui()); + z.setNoteGui(context.getNoteGui()); + z.setInterpreterContext(context); interpreterOutput.setInterpreterOutput(context.out); try { ExecuteResponse response = @@ -310,7 +309,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter { } public BaseZeppelinContext getZeppelinContext() { - return zeppelinContext; + return z; } public class JupyterKernelProcessLauncher extends ProcessLauncher { diff --git a/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IRKernelTest.java b/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IRKernelTest.java new file mode 100644 index 0000000..246400f --- /dev/null +++ b/zeppelin-jupyter-interpreter/src/test/java/org/apache/zeppelin/jupyter/IRKernelTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.jupyter; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class IRKernelTest { + + protected Interpreter interpreter; + + protected Interpreter createInterpreter(Properties properties) { + return new JupyterInterpreter(properties); + } + + @Before + public void setUp() throws InterpreterException { + Properties properties = new Properties(); + + InterpreterContext context = getInterpreterContext(); + InterpreterContext.set(context); + interpreter = createInterpreter(properties); + + InterpreterGroup interpreterGroup = new InterpreterGroup(); + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), "session_1"); + interpreter.setInterpreterGroup(interpreterGroup); + + interpreter.open(); + } + + @After + public void tearDown() throws InterpreterException { + interpreter.close(); + } + + @Test + public void testIRInterpreter() throws InterpreterException, IOException { + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = interpreter.interpret("1+1", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(1, resultMessages.size()); + assertEquals(resultMessages.toString(), + InterpreterResult.Type.HTML, resultMessages.get(0).getType()); + assertEquals(resultMessages.toString(), "2", resultMessages.get(0).getData()); + + // error + context = getInterpreterContext(); + result = interpreter.interpret("unknown_var", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(1, resultMessages.size()); + assertEquals(result.toString(), InterpreterResult.Type.TEXT, resultMessages.get(0).getType()); + assertTrue(resultMessages.toString(), + resultMessages.get(0).getData().contains("object 'unknown_var' not found")); + + // plotting + context = getInterpreterContext(); + result = interpreter.interpret("hist(mtcars$mpg)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(1, resultMessages.size()); + assertEquals(resultMessages.toString(), + InterpreterResult.Type.IMG, resultMessages.get(0).getType()); + + result = interpreter.interpret("library(ggplot2)\n" + + "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()", + getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(1, resultMessages.size()); + assertEquals(resultMessages.toString(), + InterpreterResult.Type.IMG, resultMessages.get(0).getType()); + } + + protected InterpreterContext getInterpreterContext() { + Map<String, String> localProperties = new HashMap<>(); + localProperties.put("kernel", "ir"); + InterpreterContext context = InterpreterContext.builder() + .setNoteId("note_1") + .setParagraphId("paragraph_1") + .setInterpreterOut(new InterpreterOutput(null)) + .setLocalProperties(localProperties) + .build(); + return context; + } +} diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties b/zeppelin-jupyter-interpreter/src/test/resources/log4j.properties similarity index 75% copy from zeppelin-interpreter/src/test/resources/log4j.properties copy to zeppelin-jupyter-interpreter/src/test/resources/log4j.properties index 0e4935c..ed2672f 100644 --- a/zeppelin-interpreter/src/test/resources/log4j.properties +++ b/zeppelin-jupyter-interpreter/src/test/resources/log4j.properties @@ -15,17 +15,13 @@ # limitations under the License. # +# Root logger option +log4j.rootLogger=INFO, stdout + # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n -#log4j.appender.stdout.layout.ConversionPattern= -#%5p [%t] (%F:%L) - %m%n -#%-4r [%t] %-5p %c %x - %m%n -# +log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n -# Root logger option -log4j.rootLogger=INFO, stdout -log4j.logger.org.apache.zeppelin.interpreter=DEBUG -log4j.logger.org.apache.zeppelin.scheduler=DEBUG +#log4j.logger.org.apache.zeppelin.interpreter.util=DEBUG +#log4j.logger.org.apache.zeppelin.jupyter=DEBUG