Repository: zeppelin Updated Branches: refs/heads/master bc8e190f9 -> d55058b05
ZEPPELIN-2079. Upgrade livy to 0.3 in livy interpreter ### What is this PR for? Upgrade livy to 0.3. * Add new tests for livy 0.3 * Add 2 livy build in travis. (livy 0.2 + spark 1.6.3 and livy 0.3 + spark 2.1.0, unfortunately livy 0.3 has some packaging issue which cause some issues for integration test for livy 0.3 + spark 1.6.3). I also merge the livy build into spark build in travis but has to set `sudo` as `required` for more memory. ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2079 ### How should this be tested? Tested is added ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2010 from zjffdu/ZEPPELIN-2079 and squashes the following commits: e9d1042 [Jeff Zhang] update travis 2695d7c [Jeff Zhang] ZEPPELIN-2079. Upgrade livy to 0.3 in livy interpreter Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d55058b0 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d55058b0 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d55058b0 Branch: refs/heads/master Commit: d55058b05d41ef212ec556b2ce762ccc66e407cb Parents: bc8e190 Author: Jeff Zhang <zjf...@apache.org> Authored: Mon Feb 13 09:06:39 2017 +0800 Committer: Felix Cheung <felixche...@apache.org> Committed: Sun Feb 19 10:19:27 2017 -0800 ---------------------------------------------------------------------- .travis.yml | 18 +- livy/pom.xml | 185 +++++++++++++------ .../zeppelin/livy/BaseLivyInterprereter.java | 2 +- .../zeppelin/livy/LivySparkInterpreter.java | 2 +- .../org/apache/zeppelin/livy/LivyVersion.java | 4 +- .../apache/zeppelin/livy/LivyInterpreterIT.java | 168 +++++++++++++---- 6 files changed, 279 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index c597340..3972a70 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,16 +41,18 @@ matrix: env: SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Prat" BUILD_FLAG="clean" TEST_FLAG="org.apache.rat:apache-rat-plugin:check" TEST_PROJECTS="" # Test all modules with spark 2.1.0 and scala 2.11 - - jdk: "oraclejdk7" - env: SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.1 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + - sudo: required + jdk: "oraclejdk7" + env: SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" LIVY_VER="0.3.0" PROFILE="-Pspark-2.1 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11 -Plivy-0.3" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test all modules with spark 2.0.2 and scala 2.11 - jdk: "oraclejdk7" env: SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test spark module for 1.6.3 with scala 2.10 - - jdk: "oraclejdk7" - env: SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop-2.6 -Ppyspark -Psparkr -Pscala-2.10" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + - sudo: required + jdk: "oraclejdk7" + env: SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" LIVY_VER="0.2.0" PROFILE="-Pspark-1.6 -Phadoop-2.6 -Ppyspark -Psparkr -Pscala-2.10 -Plivy-0.2" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" # Test spark module for 1.6.3 with scala 2.11 - jdk: "oraclejdk7" @@ -68,10 +70,6 @@ matrix: - jdk: "oraclejdk7" env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Ppyspark -Pscala-2.11" BUILD_FLAG="package -pl spark,python -am -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false" - # Test livy with spark 1.5.2 and hadoop 2.6 - - jdk: "oraclejdk7" - env: SCALA_VER="2.10" $LIVY_VER="0.2.0" SPARK_VER="1.5.2" HADOOP_VER="2.6" PROFILE="-Pspark-1.5 -Phadoop-2.6" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl zeppelin-interpreter,livy" TEST_PROJECTS="-DfailIfNoTests=false" - before_install: - echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxPermSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc - ./testing/install_external_dependencies.sh @@ -110,3 +108,7 @@ after_failure: - ls -R livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/* - cat livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stdout - cat livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stderr + - cat livy/target/tmp/livy-int-test/*/output.log + - ls -R livy/target/tmp/livy-int-test/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/* + - cat livy/target/tmp/livy-int-test/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stdout + - cat livy/target/tmp/livy-int-test/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stderr \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/pom.xml ---------------------------------------------------------------------- diff --git a/livy/pom.xml b/livy/pom.xml index 7e38458..66ababe 100644 --- a/livy/pom.xml +++ b/livy/pom.xml @@ -45,10 +45,6 @@ <achilles.version>3.2.4-Zeppelin</achilles.version> <assertj.version>1.7.0</assertj.version> <mockito.version>1.9.5</mockito.version> - <livy.version>0.2.0</livy.version> - <spark.version>1.5.2</spark.version> - <hadoop.version>2.6.0</hadoop.version> - <!--plugin versions--> <plugin.failsafe.version>2.16</plugin.failsafe.version> <plugin.antrun.version>1.8</plugin.antrun.version> @@ -132,27 +128,27 @@ </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.10</artifactId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.10</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> + <artifactId>spark-repl_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.10</artifactId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> @@ -192,63 +188,27 @@ </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.10</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.cloudera.livy</groupId> - <artifactId>livy-core</artifactId> - <version>${livy.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.10</artifactId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.10</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> + <artifactId>spark-repl_${scala.binary.version}</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.10</artifactId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> </exclusion> </exclusions> </dependency> @@ -258,6 +218,12 @@ <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -325,6 +291,12 @@ <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -333,6 +305,12 @@ <classifier>tests</classifier> <version>${hadoop.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -450,6 +428,10 @@ <systemPropertyVariables> <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir> </systemPropertyVariables> + <environmentVariables> + <LIVY_SPARK_SCALA_VERSION>${scala.binary.version}</LIVY_SPARK_SCALA_VERSION> + <LIVY_LOG_DIR>${project.build.directory}/tmp</LIVY_LOG_DIR> + </environmentVariables> <argLine>-Xmx2048m</argLine> </configuration> </plugin> @@ -493,4 +475,103 @@ </plugins> </build> + <profiles> + <profile> + <id>livy-0.3</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <livy.version>0.3.0</livy.version> + <spark.version>2.1.0</spark.version> + <hadoop.version>2.6.0</hadoop.version> + </properties> + <dependencies> + <dependency> + <groupId>com.cloudera.livy</groupId> + <artifactId>livy-core_${scala.binary.version}</artifactId> + <version>0.3.0</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-repl_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + + <profile> + <id>livy-0.2</id> + <properties> + <livy.version>0.2.0</livy.version> + <spark.version>1.6.2</spark.version> + <hadoop.version>2.6.0</hadoop.version> + <scala.binary.version>2.10</scala.binary.version> + </properties> + <dependencies> + <dependency> + <groupId>com.cloudera.livy</groupId> + <artifactId>livy-core</artifactId> + <version>0.2.0</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-repl_${scala.binary.version}</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index 98f54d0..fd533ab 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -266,7 +266,7 @@ public abstract class BaseLivyInterprereter extends Interpreter { } } - private LivyVersion getLivyVersion() throws LivyException { + protected LivyVersion getLivyVersion() throws LivyException { return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version)); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index 9b0e18f..f3a5eab 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -70,7 +70,7 @@ public class LivySparkInterpreter extends BaseLivyInterprereter { * @param result * @return */ - private String extractStatementResult(String result) { + public String extractStatementResult(String result) { int pos = -1; if ((pos = result.indexOf("=")) >= 0) { return result.substring(pos + 1).trim(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java index 1b7fe30..f56100f 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java @@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory; public class LivyVersion { private static final Logger logger = LoggerFactory.getLogger(LivyVersion.class); - private static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0"); - private static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0"); + protected static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0"); + protected static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0"); private int version; private String versionString; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java ---------------------------------------------------------------------- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index fbcdb53..c8f355c 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -93,10 +93,12 @@ public class LivyInterpreterIT { sparkInterpreter.open(); try { + // detect spark version InterpreterResult result = sparkInterpreter.interpret("sc.version", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData().contains("1.5.2")); + + boolean isSpark2 = isSpark2(sparkInterpreter, context); // test RDD api result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context); @@ -139,7 +141,11 @@ public class LivyInterpreterIT { result = sparkInterpreter.interpret(objectClassCode, context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData().contains("defined module Person")); + if (!isSpark2) { + assertTrue(result.message().get(0).getData().contains("defined module Person")); + } else { + assertTrue(result.message().get(0).getData().contains("defined object Person")); + } // error result = sparkInterpreter.interpret("println(a)", context); @@ -157,7 +163,7 @@ public class LivyInterpreterIT { } } -// @Test + @Test public void testSparkInterpreterDataFrame() { if (!checkPreCondition()) { return; @@ -180,18 +186,32 @@ public class LivyInterpreterIT { sqlInterpreter.open(); try { - // test DataFrame api - sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n" - + "import sqlContext.implicits._", context); - InterpreterResult result = sparkInterpreter.interpret( - "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); + // detect spark version + InterpreterResult result = sparkInterpreter.interpret("sc.version", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); - sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + boolean isSpark2 = isSpark2(sparkInterpreter, context); + + // test DataFrame api + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); + } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter result = sqlInterpreter.interpret("select * from df where col_1='hello'", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -202,12 +222,13 @@ public class LivyInterpreterIT { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData()); - // double quotes inside attribute value - // TODO(zjffdu). This test case would fail on spark-1.5, would uncomment it when upgrading to - // livy-0.3 and spark-1.6 - // result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context); - // assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - // assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + + // only enable this test in spark2 as spark1 doesn't work for this case + if (isSpark2) { + result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + } // single quotes inside attribute value result = sqlInterpreter.interpret("select * from df where col_1=\"he'llo\"", context); @@ -218,7 +239,12 @@ public class LivyInterpreterIT { result = sqlInterpreter.interpret("select * from df2", context); assertEquals(InterpreterResult.Code.ERROR, result.code()); assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); - assertTrue(result.message().get(0).getData().contains("Table Not Found")); + + if (!isSpark2) { + assertTrue(result.message().get(0).getData().contains("Table not found")); + } else { + assertTrue(result.message().get(0).getData().contains("Table or view not found")); + } } finally { sparkInterpreter.close(); sqlInterpreter.close(); @@ -275,7 +301,8 @@ public class LivyInterpreterIT { InterpreterResult result = pysparkInterpreter.interpret("sc.version", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData().contains("1.5.2")); + + boolean isSpark2 = isSpark2(pysparkInterpreter, context); // test RDD api result = pysparkInterpreter.interpret("sc.range(1, 10).sum()", context); @@ -284,23 +311,31 @@ public class LivyInterpreterIT { assertTrue(result.message().get(0).getData().contains("45")); // test DataFrame api - pysparkInterpreter.interpret("from pyspark.sql import SQLContext\n" - + "sqlContext = SQLContext(sc)", context); - result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]")); - - // test magic api + if (!isSpark2) { + pysparkInterpreter.interpret("from pyspark.sql import SQLContext\n" + + "sqlContext = SQLContext(sc)", context); + result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]")); + } else { + result = pysparkInterpreter.interpret("df=spark.createDataFrame([(\"hello\",20)])\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]")); + } + + // test magic api pysparkInterpreter.interpret("t = [{\"name\":\"userA\", \"role\":\"roleA\"}," + "{\"name\":\"userB\", \"role\":\"roleB\"}]", context); result = pysparkInterpreter.interpret("%table t", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - assertTrue(result.message().get(0).getData().contains("userA")); - + assertTrue(result.message().get(0).getData().contains("userA")); + // error result = pysparkInterpreter.interpret("print(a)", context); assertEquals(InterpreterResult.Code.ERROR, result.code()); @@ -336,7 +371,7 @@ public class LivyInterpreterIT { InterpreterResult result = sparkInterpreter.interpret("sc.version", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(2, result.message().size()); - assertTrue(result.message().get(0).getData().contains("1.5.2")); + assertTrue(result.message().get(1).getData().contains("Spark Application Id")); } finally { sparkInterpreter.close(); @@ -344,14 +379,55 @@ public class LivyInterpreterIT { } @Test - public void testSparkRInterpreter() { + public void testSparkRInterpreter() throws LivyException { if (!checkPreCondition()) { return; } - // TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release. + + LivySparkRInterpreter sparkRInterpreter = new LivySparkRInterpreter(properties); + try { + sparkRInterpreter.getLivyVersion(); + } catch (APINotFoundException e) { + // don't run sparkR test for livy 0.2 as there's some issues for livy 0.2 + return; + } + AuthenticationInfo authInfo = new AuthenticationInfo("user1"); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sparkr", + "title", "text", authInfo, null, null, null, null, null, output); + sparkRInterpreter.open(); + + try { + // only test it in livy newer than 0.2.0 + boolean isSpark2 = isSpark2(sparkRInterpreter, context); + InterpreterResult result = null; + // test DataFrame api + if (isSpark2) { + result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("eruptions waiting")); + } else { + result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)" + + "\nhead(df)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("eruptions waiting")); + } + + // error + result = sparkRInterpreter.interpret("cat(a)", context); + //TODO @zjffdu, it should be ERROR, it is due to bug of LIVY-313 + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertTrue(result.message().get(0).getData().contains("object 'a' not found")); + } finally { + sparkRInterpreter.close(); + } } -// @Test + @Test public void testLivyTutorialNote() throws IOException { if (!checkPreCondition()) { return; @@ -389,6 +465,26 @@ public class LivyInterpreterIT { } } + private boolean isSpark2(BaseLivyInterprereter interpreter, InterpreterContext context) { + InterpreterResult result = null; + if (interpreter instanceof LivySparkRInterpreter) { + result = interpreter.interpret("sparkR.session()", context); + // SparkRInterpreter would always return SUCCESS, it is due to bug of LIVY-313 + if (result.message().get(0).getData().contains("Error")) { + return false; + } else { + return true; + } + } else { + result = interpreter.interpret("spark", context); + if (result.code() == InterpreterResult.Code.SUCCESS) { + return true; + } else { + return false; + } + } + } + public static class MyInterpreterOutputListener implements InterpreterOutputListener { @Override public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {