This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new c4c580a [ZEPPELIN-5530] Support scala-2.13 for spark interpreter c4c580a is described below commit c4c580a37fde649553d336984a94bcb1b2821201 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Mar 30 09:00:25 2022 +0800 [ZEPPELIN-5530] Support scala-2.13 for spark interpreter ### What is this PR for? Spark 3.2 starts to support scala-2.13, this PR is to support scala-2.13 on the zeppelin side. A new module `scala-2.13` is introduced in this PR. `SparkILoop` is copied from the spark project with minor modification. ### What type of PR is it? [Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5530 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4322 from zjffdu/ZEPPELIN-5530 and squashes the following commits: 8836ae87b4 [Jeff Zhang] update workflow 6470d3cda3 [Jeff Zhang] [ZEPPELIN-5530] Support scala 2.13 in spark interpreter --- .github/workflows/core.yml | 16 +- spark/interpreter/pom.xml | 110 ++++++++++++- .../spark/AbstractSparkScalaInterpreter.java | 5 + .../zeppelin/spark/KotlinSparkInterpreter.java | 37 ++--- .../apache/zeppelin/spark/SparkInterpreter.java | 20 +++ .../apache/zeppelin/spark/SparkSqlInterpreter.java | 6 +- .../zeppelin/spark/SparkInterpreterTest.java | 132 ++++++++------- spark/pom.xml | 77 +-------- .../zeppelin/spark/SparkScala211Interpreter.scala | 73 ++++++++- .../zeppelin/spark/SparkScala212Interpreter.scala | 70 +++++++- spark/scala-2.13/pom.xml | 59 +++++++ spark/scala-2.13/spark-scala-parent | 1 + .../scala-2.13/src/main/resources/log4j.properties | 50 ++++++ .../org/apache/zeppelin/spark/SparkILoop.scala | 87 ++++++++++ .../zeppelin/spark/SparkScala213Interpreter.scala | 179 +++++++++++++++++++++ .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 141 +++------------- zeppelin-interpreter-parent/pom.xml | 12 +- zeppelin-jupyter-interpreter-shaded/pom.xml | 8 + .../launcher/SparkInterpreterLauncher.java | 7 +- 19 files changed, 775 insertions(+), 315 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 578414e..d4d9b95 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -54,7 +54,7 @@ jobs: restore-keys: | ${{ runner.os }}-zeppelin- - name: install application with some interpreter - run: ./mvnw install -Pbuild-distr -DskipRat -DskipTests -pl zeppelin-server,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,markdown,angular,shell -am -Phelium-dev -Pexamples -P${{ matrix.hadoop }} -B + run: ./mvnw install -Pbuild-distr -DskipRat -DskipTests -pl zeppelin-server,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,angular,shell -am -Phelium-dev -Pexamples -P${{ matrix.hadoop }} -B - name: install and test plugins run: ./mvnw package -DskipRat -pl zeppelin-plugins -amd -B - name: Setup conda environment with python 3.7 and R @@ -74,7 +74,7 @@ jobs: conda list conda info - name: run tests with ${{ matrix.hadoop }} # skip spark test because we would run them in other CI - run: ./mvnw verify -Pusing-packaged-distr -DskipRat -pl zeppelin-server,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,markdown,angular,shell -am -Phelium-dev -Pexamples -P${{ matrix.hadoop }} -Dtests.to.exclude=**/org/apache/zeppelin/spark/* -DfailIfNoTests=false + run: ./mvnw verify -Pusing-packaged-distr -DskipRat -pl zeppelin-server,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,angular,shell -am -Phelium-dev -Pexamples -P${{ matrix.hadoop }} -Dtests.to.exclude=**/org/apache/zeppelin/spark/* -DfailIfNoTests=false # test interpreter modules except spark, flink, python, rlang, jupyter interpreter-test-non-core: @@ -190,7 +190,7 @@ jobs: ${{ runner.os }}-zeppelin- - name: install environment run: | - ./mvnw install -DskipTests -DskipRat -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am + ./mvnw install -DskipTests -DskipRat -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am ./mvnw package -DskipRat -pl zeppelin-plugins -amd -DskipTests -B - name: Setup conda environment with python 3.7 and R uses: conda-incubator/setup-miniconda@v2 @@ -279,7 +279,7 @@ jobs: ${{ runner.os }}-zeppelin- - name: install environment run: | - ./mvnw install -DskipTests -DskipRat -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,markdown -am -Phadoop2 -Pintegration -B + ./mvnw install -DskipTests -DskipRat -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown -am -Phadoop2 -Pintegration -B ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests -B - name: Setup conda environment with python 3.7 and R uses: conda-incubator/setup-miniconda@v2 @@ -325,7 +325,7 @@ jobs: restore-keys: | ${{ runner.os }}-zeppelin- - name: install environment - run: ./mvnw install -DskipTests -DskipRat -pl spark-submit,spark/scala-2.11,spark/scala-2.12 -am -Phadoop2 -B + run: ./mvnw install -DskipTests -DskipRat -pl spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13 -am -Phadoop2 -B - name: Setup conda environment with python ${{ matrix.python }} and R uses: conda-incubator/setup-miniconda@v2 with: @@ -359,7 +359,11 @@ jobs: - name: run spark-3.2 tests with scala-2.12 and python-${{ matrix.python }} run: | rm -rf spark/interpreter/metastore_db - ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.1 -Pspark-scala-2.12 -Phadoop2 -Pintegration -B -DfailIfNoTests=false + ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.2 -Pspark-scala-2.12 -Phadoop2 -Pintegration -B -DfailIfNoTests=false + - name: run spark-3.2 tests with scala-2.13 and python-${{ matrix.python }} + run: | + rm -rf spark/interpreter/metastore_db + ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.2 -Pspark-scala-2.13 -Phadoop2 -Pintegration -B -DfailIfNoTests=false livy-0-5-with-spark-2-2-0-under-python3: runs-on: ubuntu-20.04 diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index 3a22650..e539f99 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -40,6 +40,25 @@ <maven.aeither.provider.version>3.0.3</maven.aeither.provider.version> <wagon.version>2.7</wagon.version> + <datanucleus.rdbms.version>3.2.9</datanucleus.rdbms.version> + <datanucleus.apijdo.version>3.2.6</datanucleus.apijdo.version> + <datanucleus.core.version>3.2.10</datanucleus.core.version> + + <!-- spark versions --> + <spark.version>3.1.2</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <py4j.version>0.10.9</py4j.version> + <spark.scala.version>2.12.7</spark.scala.version> + <spark.scala.binary.version>2.12</spark.scala.binary.version> + + <spark.archive>spark-${spark.version}</spark.archive> + <spark.src.download.url> + https://archive.apache.org/dist/spark/${spark.archive}/${spark.archive}.tgz + </spark.src.download.url> + <spark.bin.download.url> + https://archive.apache.org/dist/spark/${spark.archive}/${spark.archive}-bin-without-hadoop.tgz + </spark.bin.download.url> + <scala.compile.version>${spark.scala.version}</scala.compile.version> <!-- settings --> <pyspark.test.exclude>**/PySparkInterpreterMatplotlibTest.java</pyspark.test.exclude> @@ -270,13 +289,6 @@ <!--test libraries--> <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${spark.scala.binary.version}</artifactId> - <version>${scalatest.version}</version> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.datanucleus</groupId> <artifactId>datanucleus-core</artifactId> <version>${datanucleus.core.version}</version> @@ -525,4 +537,88 @@ </plugin> </plugins> </build> + + <profiles> + + <!-- profile spark-scala-x only affect the unit test in spark/interpreter module --> + + <profile> + <id>spark-scala-2.13</id> + <properties> + <spark.scala.version>2.13.4</spark.scala.version> + <spark.scala.binary.version>2.13</spark.scala.binary.version> + </properties> + </profile> + + <profile> + <id>spark-scala-2.12</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <spark.scala.version>2.12.7</spark.scala.version> + <spark.scala.binary.version>2.12</spark.scala.binary.version> + </properties> + </profile> + + <profile> + <id>spark-scala-2.11</id> + <properties> + <spark.scala.version>2.11.12</spark.scala.version> + <spark.scala.binary.version>2.11</spark.scala.binary.version> + </properties> + </profile> + + <!-- profile spark-x only affect spark version used in test --> + + <profile> + <id>spark-3.2</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <datanucleus.core.version>4.1.17</datanucleus.core.version> + <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> + <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> + <spark.version>3.2.0</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <py4j.version>0.10.9.2</py4j.version> + </properties> + </profile> + + <profile> + <id>spark-3.1</id> + <properties> + <datanucleus.core.version>4.1.17</datanucleus.core.version> + <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> + <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> + <spark.version>3.1.2</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <py4j.version>0.10.9</py4j.version> + </properties> + </profile> + + <profile> + <id>spark-3.0</id> + <properties> + <datanucleus.core.version>4.1.17</datanucleus.core.version> + <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> + <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> + <spark.version>3.0.3</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <py4j.version>0.10.9</py4j.version> + </properties> + </profile> + + <profile> + <id>spark-2.4</id> + <properties> + <spark.version>2.4.5</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <py4j.version>0.10.7</py4j.version> + </properties> + </profile> + + </profiles> + </project> diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java index bf3abd8..71acd5e 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java @@ -25,6 +25,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.kotlin.KotlinInterpreter; import java.util.List; @@ -65,6 +66,10 @@ public abstract class AbstractSparkScalaInterpreter { public abstract InterpreterResult interpret(String st, InterpreterContext context); + public abstract InterpreterResult delegateInterpret(KotlinInterpreter kotlinInterpreter, + String st, + InterpreterContext context); + public abstract List<InterpreterCompletion> completion(String buf, int cursor, InterpreterContext interpreterContext); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java index 32de4b4..299f70b 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java @@ -17,17 +17,24 @@ package org.apache.zeppelin.spark; -import static org.apache.zeppelin.spark.Utils.buildJobDesc; -import static org.apache.zeppelin.spark.Utils.buildJobGroupId; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.util.Utils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ZeppelinContext; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.kotlin.KotlinInterpreter; +import org.apache.zeppelin.spark.kotlin.KotlinZeppelinBindings; +import org.apache.zeppelin.spark.kotlin.SparkKotlinReceiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; -import java.io.PrintStream; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -35,17 +42,9 @@ import java.util.Properties; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.Console; -import org.apache.zeppelin.interpreter.ZeppelinContext; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterOutput; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.kotlin.KotlinInterpreter; -import org.apache.zeppelin.spark.kotlin.KotlinZeppelinBindings; -import org.apache.zeppelin.spark.kotlin.SparkKotlinReceiver; + +import static org.apache.zeppelin.spark.Utils.buildJobDesc; +import static org.apache.zeppelin.spark.Utils.buildJobGroupId; public class KotlinSparkInterpreter extends Interpreter { private static Logger logger = LoggerFactory.getLogger(KotlinSparkInterpreter.class); @@ -125,15 +124,7 @@ public class KotlinSparkInterpreter extends Interpreter { jsc.setJobGroup(buildJobGroupId(context), buildJobDesc(context), false); jsc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool")); - InterpreterOutput out = context.out; - PrintStream scalaOut = Console.out(); - PrintStream newOut = (out != null) ? new PrintStream(out) : null; - - Console.setOut(newOut); - InterpreterResult result = interpreter.interpret(st, context); - Console.setOut(scalaOut); - - return result; + return sparkInterpreter.delegateInterpret(interpreter, st, context); } @Override diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index b13d67f..7701ebf 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.kotlin.KotlinInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,10 +85,12 @@ public class SparkInterpreter extends AbstractInterpreter { if (Boolean.parseBoolean(properties.getProperty("zeppelin.spark.scala.color", "true"))) { System.setProperty("scala.color", "true"); } + this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean( properties.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true")); innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.spark.SparkScala211Interpreter"); innerInterpreterClassMap.put("2.12", "org.apache.zeppelin.spark.SparkScala212Interpreter"); + innerInterpreterClassMap.put("2.13", "org.apache.zeppelin.spark.SparkScala213Interpreter"); } @Override @@ -145,6 +148,7 @@ public class SparkInterpreter extends AbstractInterpreter { * * SparkScala211Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.11 * SparkScala212Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.12 + * SparkScala213Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.13 * * @param conf * @return AbstractSparkScalaInterpreter @@ -234,6 +238,12 @@ public class SparkInterpreter extends AbstractInterpreter { return this.innerInterpreter.getZeppelinContext(); } + public InterpreterResult delegateInterpret(KotlinInterpreter kotlinInterpreter, + String code, + InterpreterContext context) { + return innerInterpreter.delegateInterpret(kotlinInterpreter, code, context); + } + public SparkContext getSparkContext() { return this.sc; } @@ -280,15 +290,25 @@ public class SparkInterpreter extends AbstractInterpreter { return "2.11"; } else if (scalaVersionString.contains("2.12")) { return "2.12"; + } else if (scalaVersionString.contains("2.13")) { + return "2.13"; } else { throw new InterpreterException("Unsupported scala version: " + scalaVersionString); } } + public boolean isScala211() throws InterpreterException { + return scalaVersion.equals("2.11"); + } + public boolean isScala212() throws InterpreterException { return scalaVersion.equals("2.12"); } + public boolean isScala213() { + return scalaVersion.equals("2.13"); + } + private List<String> getDependencyFiles() throws InterpreterException { List<String> depFiles = new ArrayList<>(); // add jar from local repo diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 6c06399..95e8bd7 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -95,9 +95,9 @@ public class SparkSqlInterpreter extends AbstractInterpreter { String curSql = null; ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { - if (!sparkInterpreter.isScala212()) { - // TODO(zjffdu) scala 2.12 still doesn't work for codegen (ZEPPELIN-4627) - Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader()); + if (sparkInterpreter.isScala211()) { + // TODO(zjffdu) scala 2.12,2.13 still doesn't work for codegen (ZEPPELIN-4627) + Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader()); } Method method = sqlContext.getClass().getMethod("sql", String.class); for (String sql : sqls) { diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 297b55d..d3a9622 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -90,7 +90,8 @@ public class SparkInterpreterTest { InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals("a: String = hello world\n", output); + // Use contains instead of equals, because there's behavior difference between different scala versions + assertTrue(output, output.contains("a: String = hello world\n")); result = interpreter.interpret("print(a)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -122,9 +123,12 @@ public class SparkInterpreterTest { result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - // multiple line comment - result = interpreter.interpret("/*line 1 \n line 2*/", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + if (!interpreter.isScala213()) { + // multiple line comment, not supported by scala-2.13 + context = getInterpreterContext(); + result = interpreter.interpret("/*line 1 \n line 2*/", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + } // test function result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", getInterpreterContext()); @@ -136,8 +140,11 @@ public class SparkInterpreterTest { result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = interpreter.interpret("$intp", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + if (!interpreter.isScala213()) { + // $intp not available for scala-2.13 + result = interpreter.interpret("$intp", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } // Companion object with case class result = interpreter.interpret("import scala.math._\n" + @@ -152,6 +159,11 @@ public class SparkInterpreterTest { "val circle1 = new Circle(5.0)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // use case class in spark + // context = getInterpreterContext(); + // result = interpreter.interpret("sc\n.range(1, 10)\n.map(e=>Circle(e))\n.collect()", context); + // assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + // class extend result = interpreter.interpret("import java.util.ArrayList", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -162,7 +174,7 @@ public class SparkInterpreterTest { // spark rdd operation context = getInterpreterContext(); context.setParagraphId("pid_1"); - result = interpreter.interpret("sc\n.range(1, 10)\n.sum", context); + result = interpreter.interpret("sc\n.range(1, 10)\n.map(e=>e)\n.sum", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(output.contains("45")); ArgumentCaptor<Map> captorEvent = ArgumentCaptor.forClass(Map.class); @@ -207,46 +219,29 @@ public class SparkInterpreterTest { // spark sql test String version = output.trim(); - if (version.contains("String = 1.")) { - result = interpreter.interpret("sqlContext", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - - result = interpreter.interpret( - "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + - "df.show()", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertTrue(output.contains( - "+---+----+\n" + - "| _1| _2|\n" + - "+---+----+\n" + - "| 1| a|\n" + - "| 2|null|\n" + - "+---+----+")); - } else { - // create dataset from case class - context = getInterpreterContext(); - result = interpreter.interpret("case class Person(id:Int, name:String, age:Int, country:String)\n" + - "val df2 = spark.createDataFrame(Seq(Person(1, \"andy\", 20, \"USA\"), " + - "Person(2, \"jeff\", 23, \"China\"), Person(3, \"james\", 18, \"USA\")))\n" + - "df2.printSchema\n" + - "df2.show() ", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // create dataset from case class + context = getInterpreterContext(); + result = interpreter.interpret("case class Person(id:Int, name:String, age:Int, country:String)\n" + + "val df2 = spark.createDataFrame(Seq(Person(1, \"andy\", 20, \"USA\"), " + + "Person(2, \"jeff\", 23, \"China\"), Person(3, \"james\", 18, \"USA\")))\n" + + "df2.printSchema\n" + + "df2.show() ", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = interpreter.interpret("spark", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + result = interpreter.interpret("spark", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = interpreter.interpret( - "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + - "df.show()", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertTrue(output.contains( - "+---+----+\n" + - "| _1| _2|\n" + - "+---+----+\n" + - "| 1| a|\n" + - "| 2|null|\n" + - "+---+----+")); - } + result = interpreter.interpret( + "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + + "df.show()", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(output.contains( + "+---+----+\n" + + "| _1| _2|\n" + + "+---+----+\n" + + "| 1| a|\n" + + "| 2|null|\n" + + "+---+----+")); // ZeppelinContext context = getInterpreterContext(); @@ -303,7 +298,6 @@ public class SparkInterpreterTest { assertEquals("value_2", select.getOptions()[1].getValue()); assertEquals("name_2", select.getOptions()[1].getDisplayName()); - // completions List<InterpreterCompletion> completions = interpreter.completion("a.", 2, getInterpreterContext()); assertTrue(completions.size() > 0); @@ -321,26 +315,29 @@ public class SparkInterpreterTest { assertEquals(1, completions.size()); assertEquals("range", completions.get(0).name); - // Zeppelin-Display - result = interpreter.interpret("import org.apache.zeppelin.display.angular.notebookscope._\n" + - "import AngularElem._", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - - result = interpreter.interpret("<div style=\"color:blue\">\n" + - "<h4>Hello Angular Display System</h4>\n" + - "</div>.display", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType()); - assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Hello Angular Display System")); + if (!interpreter.isScala213()) { + // Zeppelin-Display + result = interpreter.interpret("import org.apache.zeppelin.display.angular.notebookscope._\n" + + "import AngularElem._", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = interpreter.interpret("<div class=\"btn btn-success\">\n" + - " Click me\n" + - "</div>.onClick{() =>\n" + - " println(\"hello world\")\n" + - "}.display", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType()); - assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Click me")); + context = getInterpreterContext(); + result = interpreter.interpret("<div style=\"color:blue\">\n" + + "<h4>Hello Angular Display System</h4>\n" + + "</div>.display", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType()); + assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Hello Angular Display System")); + + result = interpreter.interpret("<div class=\"btn btn-success\">\n" + + " Click me\n" + + "</div>.onClick{() =>\n" + + " println(\"hello world\")\n" + + "}.display", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType()); + assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Click me")); + } // getProgress final InterpreterContext context2 = getInterpreterContext(); @@ -439,7 +436,8 @@ public class SparkInterpreterTest { InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals("a: String = hello world\n", output); + // Use contains instead of equals, because there's behavior different between different scala versions + assertTrue(output, output.contains("a: String = hello world\n")); result = interpreter.interpret("print(a)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -457,7 +455,7 @@ public class SparkInterpreterTest { // REPL output get back if we don't set printREPLOutput in paragraph local properties result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals("a: String = hello world\n", output); + assertTrue(output, output.contains("a: String = hello world\n")); result = interpreter.interpret("print(a)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); diff --git a/spark/pom.xml b/spark/pom.xml index f73c87b..43241f3 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -33,7 +33,6 @@ <description>Zeppelin Spark Support</description> <properties> - <!--library versions--> <datanucleus.rdbms.version>3.2.9</datanucleus.rdbms.version> <datanucleus.apijdo.version>3.2.6</datanucleus.apijdo.version> <datanucleus.core.version>3.2.10</datanucleus.core.version> @@ -45,6 +44,8 @@ <spark.scala.version>2.12.7</spark.scala.version> <spark.scala.binary.version>2.12</spark.scala.binary.version> + <scala.compile.version>${spark.scala.version}</scala.compile.version> + <spark.archive>spark-${spark.version}</spark.archive> <spark.src.download.url> https://archive.apache.org/dist/spark/${spark.archive}/${spark.archive}.tgz @@ -59,6 +60,7 @@ <module>spark-scala-parent</module> <module>scala-2.11</module> <module>scala-2.12</module> + <module>scala-2.13</module> <module>spark-shims</module> <module>spark2-shims</module> <module>spark3-shims</module> @@ -131,78 +133,5 @@ </plugins> </build> - <profiles> - - <!-- profile spark-scala-x only affect the unit test in spark/interpreter module --> - - <profile> - <id>spark-scala-2.12</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <properties> - <spark.scala.version>2.12.7</spark.scala.version> - <spark.scala.binary.version>2.12</spark.scala.binary.version> - </properties> - </profile> - - <profile> - <id>spark-scala-2.11</id> - <properties> - <spark.scala.version>2.11.12</spark.scala.version> - <spark.scala.binary.version>2.11</spark.scala.binary.version> - </properties> - </profile> - - <!-- profile spark-x only affect the embedded spark version in zeppelin distribution --> - - <profile> - <id>spark-3.2</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <properties> - <datanucleus.core.version>4.1.17</datanucleus.core.version> - <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> - <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> - <spark.version>3.2.0</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.9.2</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-3.1</id> - <properties> - <datanucleus.core.version>4.1.17</datanucleus.core.version> - <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> - <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> - <spark.version>3.1.2</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.9</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-3.0</id> - <properties> - <datanucleus.core.version>4.1.17</datanucleus.core.version> - <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> - <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> - <spark.version>3.0.3</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.9</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-2.4</id> - <properties> - <spark.version>2.4.5</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.7</py4j.version> - </properties> - </profile> - </profiles> </project> diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index d1a3b08..013cab4 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -21,12 +21,11 @@ import java.io.{BufferedReader, File} import java.net.URLClassLoader import java.nio.file.{Files, Paths} import java.util.Properties - import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup} +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult} import org.slf4j.LoggerFactory import org.slf4j.Logger @@ -50,6 +49,8 @@ class SparkScala211Interpreter(override val conf: SparkConf, private var sparkILoop: SparkILoop = _ + private var scalaCompletion: Completion = _ + override val interpreterOutput = new InterpreterOutputStream(LOGGER) override def open(): Unit = { @@ -60,12 +61,7 @@ class SparkScala211Interpreter(override val conf: SparkConf, LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - startHttpServer(outputDir).foreach { case (server, uri) => - sparkHttpServer = server - conf.set("spark.repl.class.uri", uri) - } val target = conf.get("spark.repl.target", "jvm-1.6") - val settings = new Settings() settings.processArguments(List("-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) @@ -96,9 +92,72 @@ class SparkScala211Interpreter(override val conf: SparkConf, this.scalaCompletion = reader.completion createSparkContext() + scalaInterpret("import org.apache.spark.SparkContext._") + scalaInterpret("import spark.implicits._") + scalaInterpret("import spark.sql") + scalaInterpret("import org.apache.spark.sql.functions._") + // print empty string otherwise the last statement's output of this method + // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code + scalaInterpret("print(\"\")") createZeppelinContext() } + def interpret(code: String, context: InterpreterContext): InterpreterResult = { + + val originalOut = System.out + val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean + + def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = { + Console.withOut(interpreterOutput) { + System.setOut(Console.out) + if (printREPLOutput) { + interpreterOutput.setInterpreterOutput(context.out) + } else { + interpreterOutput.setInterpreterOutput(null) + } + interpreterOutput.ignoreLeadingNewLinesFromScalaReporter() + + val status = scalaInterpret(code) match { + case succ...@scala.tools.nsc.interpreter.ir.Success => + success + case scala.tools.nsc.interpreter.IR.Error => + val errorMsg = new String(interpreterOutput.getInterpreterOutput.toByteArray) + if (errorMsg.contains("value toDF is not a member of org.apache.spark.rdd.RDD") || + errorMsg.contains("value toDS is not a member of org.apache.spark.rdd.RDD")) { + // prepend "import sqlContext.implicits._" due to + // https://issues.scala-lang.org/browse/SI-6649 + context.out.clear() + scalaInterpret("import sqlContext.implicits._\n" + code) + } else { + scala.tools.nsc.interpreter.IR.Error + } + case scala.tools.nsc.interpreter.IR.Incomplete => + // add print("") at the end in case the last line is comment which lead to INCOMPLETE + scalaInterpret(code + "\nprint(\"\")") + } + context.out.flush() + status + } + } + // reset the java stdout + System.setOut(originalOut) + + context.out.write("") + val lastStatus = _interpret(code) match { + case scala.tools.nsc.interpreter.IR.Success => + InterpreterResult.Code.SUCCESS + case scala.tools.nsc.interpreter.IR.Error => + InterpreterResult.Code.ERROR + case scala.tools.nsc.interpreter.IR.Incomplete => + InterpreterResult.Code.INCOMPLETE + } + + lastStatus match { + case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" ) + case _ => new InterpreterResult(lastStatus) + } + } + protected override def completion(buf: String, cursor: Int, context: InterpreterContext): java.util.List[InterpreterCompletion] = { diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index 4918e4b..80c66c2 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -21,12 +21,11 @@ import java.io.{BufferedReader, File} import java.net.URLClassLoader import java.nio.file.{Files, Paths} import java.util.Properties - import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup} +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult} import org.slf4j.LoggerFactory import org.slf4j.Logger @@ -48,6 +47,8 @@ class SparkScala212Interpreter(override val conf: SparkConf, private var sparkILoop: SparkILoop = _ + private var scalaCompletion: Completion = _ + override val interpreterOutput = new InterpreterOutputStream(LOGGER) override def open(): Unit = { @@ -86,9 +87,74 @@ class SparkScala212Interpreter(override val conf: SparkConf, this.scalaCompletion = reader.completion createSparkContext() + + scalaInterpret("import org.apache.spark.SparkContext._") + scalaInterpret("import spark.implicits._") + scalaInterpret("import spark.sql") + scalaInterpret("import org.apache.spark.sql.functions._") + // print empty string otherwise the last statement's output of this method + // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code + scalaInterpret("print(\"\")") + createZeppelinContext() } + def interpret(code: String, context: InterpreterContext): InterpreterResult = { + + val originalOut = System.out + val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean + + def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = { + Console.withOut(interpreterOutput) { + System.setOut(Console.out) + if (printREPLOutput) { + interpreterOutput.setInterpreterOutput(context.out) + } else { + interpreterOutput.setInterpreterOutput(null) + } + interpreterOutput.ignoreLeadingNewLinesFromScalaReporter() + + val status = scalaInterpret(code) match { + case succ...@scala.tools.nsc.interpreter.ir.Success => + success + case scala.tools.nsc.interpreter.IR.Error => + val errorMsg = new String(interpreterOutput.getInterpreterOutput.toByteArray) + if (errorMsg.contains("value toDF is not a member of org.apache.spark.rdd.RDD") || + errorMsg.contains("value toDS is not a member of org.apache.spark.rdd.RDD")) { + // prepend "import sqlContext.implicits._" due to + // https://issues.scala-lang.org/browse/SI-6649 + context.out.clear() + scalaInterpret("import sqlContext.implicits._\n" + code) + } else { + scala.tools.nsc.interpreter.IR.Error + } + case scala.tools.nsc.interpreter.IR.Incomplete => + // add print("") at the end in case the last line is comment which lead to INCOMPLETE + scalaInterpret(code + "\nprint(\"\")") + } + context.out.flush() + status + } + } + // reset the java stdout + System.setOut(originalOut) + + context.out.write("") + val lastStatus = _interpret(code) match { + case scala.tools.nsc.interpreter.IR.Success => + InterpreterResult.Code.SUCCESS + case scala.tools.nsc.interpreter.IR.Error => + InterpreterResult.Code.ERROR + case scala.tools.nsc.interpreter.IR.Incomplete => + InterpreterResult.Code.INCOMPLETE + } + + lastStatus match { + case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" ) + case _ => new InterpreterResult(lastStatus) + } + } + protected override def completion(buf: String, cursor: Int, context: InterpreterContext): java.util.List[InterpreterCompletion] = { diff --git a/spark/scala-2.13/pom.xml b/spark/scala-2.13/pom.xml new file mode 100644 index 0000000..c8f3bbc --- /dev/null +++ b/spark/scala-2.13/pom.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark-scala-parent</artifactId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../spark-scala-parent/pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>spark-scala-2.13</artifactId> + <packaging>jar</packaging> + <name>Zeppelin: Spark Interpreter Scala_2.13</name> + + <properties> + <spark.version>3.2.0</spark.version> + <spark.scala.version>2.13.4</spark.scala.version> + <spark.scala.binary.version>2.13</spark.scala.binary.version> + <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/spark/scala-2.13/spark-scala-parent b/spark/scala-2.13/spark-scala-parent new file mode 120000 index 0000000..e5e899e --- /dev/null +++ b/spark/scala-2.13/spark-scala-parent @@ -0,0 +1 @@ +../spark-scala-parent \ No newline at end of file diff --git a/spark/scala-2.13/src/main/resources/log4j.properties b/spark/scala-2.13/src/main/resources/log4j.properties new file mode 100644 index 0000000..0c90b21 --- /dev/null +++ b/spark/scala-2.13/src/main/resources/log4j.properties @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n +#log4j.appender.stdout.layout.ConversionPattern= +#%5p [%t] (%F:%L) - %m%n +#%-4r [%t] %-5p %c %x - %m%n +# + +# Root logger option +log4j.rootLogger=INFO, stdout + +#mute some noisy guys +log4j.logger.org.apache.hadoop.mapred=WARN +log4j.logger.org.apache.hadoop.hive.ql=WARN +log4j.logger.org.apache.hadoop.hive.metastore=WARN +log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN +log4j.logger.org.apache.zeppelin.scheduler=WARN + +log4j.logger.org.quartz=WARN +log4j.logger.DataNucleus=WARN +log4j.logger.DataNucleus.MetaData=ERROR +log4j.logger.DataNucleus.Datastore=ERROR + +# Log all JDBC parameters +log4j.logger.org.hibernate.type=ALL + +log4j.logger.org.apache.zeppelin.interpreter=DEBUG +log4j.logger.org.apache.zeppelin.spark=DEBUG + + +log4j.logger.org.apache.spark.repl.Main=INFO diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkILoop.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkILoop.scala new file mode 100644 index 0000000..71f93a9 --- /dev/null +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkILoop.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark + +import java.io.{BufferedReader, PrintWriter} + +import scala.Predef.{println => _, _} +import scala.tools.nsc.GenericRunnerSettings +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.shell.{ILoop, ShellConfig} +import scala.tools.nsc.util.stringFromStream +import scala.util.Properties.{javaVersion, javaVmName, versionString} + +/** + * Copy from spark project with minor modification (Remove the SparkContext initialization) + */ +class SparkILoop(in0: BufferedReader, out: PrintWriter) + extends ILoop(ShellConfig(new GenericRunnerSettings(_ => ())), in0, out) { + def this() = this(null, new PrintWriter(Console.out, true)) + + override protected def internalReplAutorunCode(): Seq[String] = Seq.empty + + /** Print a welcome message */ + override def printWelcome(): Unit = { + import org.apache.spark.SPARK_VERSION + echo("""Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version %s + /_/ + """.format(SPARK_VERSION)) + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, + javaVmName, + javaVersion + ) + echo(welcomeMsg) + echo("Type in expressions to have them evaluated.") + echo("Type :help for more information.") + } + + /** Available commands */ + override def commands: List[LoopCommand] = standardCommands + + override def resetCommand(line: String): Unit = { + super.resetCommand(line) + echo( + "Note that after :reset, state of SparkSession and SparkContext is unchanged." + ) + } + + override def replay(): Unit = { + super.replay() + } + + /** Start an interpreter with the given settings. + * @return true if successful + */ + override def run(interpreterSettings: Settings): Boolean = { + createInterpreter(interpreterSettings) + intp.reporter.withoutPrintingResults(intp.withSuppressedSettings { + intp.initializeCompiler() + if (intp.reporter.hasErrors) { + echo("Interpreter encountered errors during initialization!") + throw new InterruptedException + } + }) + true + } +} + diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala new file mode 100644 index 0000000..1753193 --- /dev/null +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark + + +import org.apache.spark.SparkConf +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult} +import org.slf4j.{Logger, LoggerFactory} + +import java.io.{File, PrintWriter} +import java.net.URLClassLoader +import java.util.Properties +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ +import scala.tools.nsc.interpreter.shell.{Accumulator, Completion, ReplCompletion} + +/** + * SparkInterpreter for scala-2.13 + */ +class SparkScala213Interpreter(override val conf: SparkConf, + override val depFiles: java.util.List[String], + override val properties: Properties, + override val interpreterGroup: InterpreterGroup, + override val sparkInterpreterClassLoader: URLClassLoader, + val outputDir: File) + extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { + + lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) + + private var sparkILoop: SparkILoop = _ + + private var scalaCompletion: Completion = _ + + override val interpreterOutput = new InterpreterOutputStream(LOGGER) + + override def open(): Unit = { + super.open() + if (sparkMaster == "yarn-client") { + System.setProperty("SPARK_YARN_MODE", "true") + } + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) + + val settings = new Settings() + settings.processArguments(List("-Yrepl-class-based", + "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) + settings.embeddedDefaults(sparkInterpreterClassLoader) + settings.usejavacp.value = true + this.userJars = getUserJars() + LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) + settings.classpath.value = userJars.mkString(File.pathSeparator) + + val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean + val replOut = if (printReplOutput) { + new PrintWriter(interpreterOutput, true) + } else { + new PrintWriter(Console.out, true) + } + sparkILoop = new SparkILoop(null, replOut) + sparkILoop.run(settings) + this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator) + + createSparkContext() + + scalaInterpret("import org.apache.spark.SparkContext._") + scalaInterpret("import spark.implicits._") + scalaInterpret("import spark.sql") + scalaInterpret("import org.apache.spark.sql.functions._") + // print empty string otherwise the last statement's output of this method + // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code + scalaInterpret("print(\"\")") + + createZeppelinContext() + } + + def interpret(code: String, context: InterpreterContext): InterpreterResult = { + + val originalOut = System.out + val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean + + def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = { + Console.withOut(interpreterOutput) { + System.setOut(Console.out) + if (printREPLOutput) { + interpreterOutput.setInterpreterOutput(context.out) + } else { + interpreterOutput.setInterpreterOutput(null) + } + interpreterOutput.ignoreLeadingNewLinesFromScalaReporter() + + val status = scalaInterpret(code) match { + case succ...@scala.tools.nsc.interpreter.Results.Success => + success + case scala.tools.nsc.interpreter.Results.Error => + val errorMsg = new String(interpreterOutput.getInterpreterOutput.toByteArray) + if (errorMsg.contains("value toDF is not a member of org.apache.spark.rdd.RDD") || + errorMsg.contains("value toDS is not a member of org.apache.spark.rdd.RDD")) { + // prepend "import sqlContext.implicits._" due to + // https://issues.scala-lang.org/browse/SI-6649 + context.out.clear() + scalaInterpret("import sqlContext.implicits._\n" + code) + } else { + scala.tools.nsc.interpreter.Results.Error + } + case scala.tools.nsc.interpreter.Results.Incomplete => + // add print("") at the end in case the last line is comment which lead to INCOMPLETE + scalaInterpret(code + "\nprint(\"\")") + } + context.out.flush() + status + } + } + // reset the java stdout + System.setOut(originalOut) + + context.out.write("") + val lastStatus = _interpret(code) match { + case scala.tools.nsc.interpreter.Results.Success => + InterpreterResult.Code.SUCCESS + case scala.tools.nsc.interpreter.Results.Error => + InterpreterResult.Code.ERROR + case scala.tools.nsc.interpreter.Results.Incomplete => + InterpreterResult.Code.INCOMPLETE + } + + lastStatus match { + case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" ) + case _ => new InterpreterResult(lastStatus) + } + } + + def scalaInterpret(code: String): scala.tools.nsc.interpreter.Results.Result = + sparkILoop.interpret(code) + + protected override def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates + .map(e => new InterpreterCompletion(e.defString, e.defString, null)) + scala.collection.JavaConverters.asJava(completions) + } + + protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { + sparkILoop.beQuietDuring { + val result = sparkILoop.bind(name, tpe, value, modifier) + if (result != Results.Success) { + throw new RuntimeException("Fail to bind variable: " + name) + } + } + } + + override def close(): Unit = { + super.close() + if (sparkILoop != null) { + sparkILoop.closeInterpreter() + } + } + + override def getScalaShellClassLoader: ClassLoader = { + sparkILoop.classLoader + } +} diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 46100da..b07de98 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -18,26 +18,25 @@ package org.apache.zeppelin.spark -import java.io.{File, IOException} -import java.net.{URL, URLClassLoader} +import java.io.{File, IOException, PrintStream} +import java.net.URLClassLoader import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger - import org.apache.commons.lang3.StringUtils import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.apache.zeppelin.interpreter.util.InterpreterOutputStream import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult, ZeppelinContext} +import org.apache.zeppelin.kotlin.KotlinInterpreter import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ -import scala.tools.nsc.interpreter.Completion -import scala.util.control.NonFatal + /** * Base class for different scala versions of SparkInterpreter. It should be @@ -61,16 +60,12 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected var sqlContext: SQLContext = _ - protected var sparkSession: Object = _ + protected var sparkSession: SparkSession = _ protected var userJars: Seq[String] = _ - protected var sparkHttpServer: Object = _ - protected var sparkUrl: String = _ - protected var scalaCompletion: Completion = _ - protected var z: SparkZeppelinContext = _ protected val interpreterOutput: InterpreterOutputStream @@ -100,67 +95,20 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, BaseSparkScalaInterpreter.sessionNum.incrementAndGet() } - def interpret(code: String, context: InterpreterContext): InterpreterResult = { - - val originalOut = System.out - val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean - - def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = { - Console.withOut(interpreterOutput) { - System.setOut(Console.out) - if (printREPLOutput) { - interpreterOutput.setInterpreterOutput(context.out) - } else { - interpreterOutput.setInterpreterOutput(null) - } - interpreterOutput.ignoreLeadingNewLinesFromScalaReporter() - - val status = scalaInterpret(code) match { - case succ...@scala.tools.nsc.interpreter.ir.Success => - success - case scala.tools.nsc.interpreter.IR.Error => - val errorMsg = new String(interpreterOutput.getInterpreterOutput.toByteArray) - if (errorMsg.contains("value toDF is not a member of org.apache.spark.rdd.RDD") || - errorMsg.contains("value toDS is not a member of org.apache.spark.rdd.RDD")) { - // prepend "import sqlContext.implicits._" due to - // https://issues.scala-lang.org/browse/SI-6649 - context.out.clear() - scalaInterpret("import sqlContext.implicits._\n" + code) - } else { - scala.tools.nsc.interpreter.IR.Error - } - case scala.tools.nsc.interpreter.IR.Incomplete => - // add print("") at the end in case the last line is comment which lead to INCOMPLETE - scalaInterpret(code + "\nprint(\"\")") - } - context.out.flush() - status - } - } - // reset the java stdout - System.setOut(originalOut) - - context.out.write("") - val lastStatus = _interpret(code) match { - case scala.tools.nsc.interpreter.IR.Success => - InterpreterResult.Code.SUCCESS - case scala.tools.nsc.interpreter.IR.Error => - InterpreterResult.Code.ERROR - case scala.tools.nsc.interpreter.IR.Incomplete => - InterpreterResult.Code.INCOMPLETE - } - - lastStatus match { - case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" ) - case _ => new InterpreterResult(lastStatus) + // Used by KotlinSparkInterpreter + def delegateInterpret(interpreter: KotlinInterpreter, + code: String, + context: InterpreterContext): InterpreterResult = { + val out = context.out + val newOut = if (out != null) new PrintStream(out) else null + Console.withOut(newOut) { + interpreter.interpret(code, context) } } protected def interpret(code: String): InterpreterResult = interpret(code, InterpreterContext.get()) - protected def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result - protected def getProgress(jobGroup: String, context: InterpreterContext): Int = { JobProgressUtil.progress(sc, jobGroup) } @@ -197,9 +145,6 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, cleanupStagingDirInternal(stagingDirPath, hadoopConf) } - if (sparkHttpServer != null) { - sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer) - } if (sc != null) { sc.stop() sc = null @@ -244,7 +189,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean] if (hiveSiteExisted && hiveClassesPresent) { builder.getClass.getMethod("enableHiveSupport").invoke(builder) - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder) + sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] LOGGER.info("Created Spark session (with Hive support)"); } else { if (!hiveClassesPresent) { @@ -253,11 +198,11 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, if (!hiveSiteExisted) { LOGGER.warn("Hive support can not be enabled because no hive-site.xml found") } - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder) + sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] LOGGER.info("Created Spark session (without Hive support)"); } } else { - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder) + sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] LOGGER.info("Created Spark session (without Hive support)"); } @@ -276,17 +221,9 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient""")) bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, List("""@transient""")) - - scalaInterpret("import org.apache.spark.SparkContext._") - scalaInterpret("import spark.implicits._") - scalaInterpret("import spark.sql") - scalaInterpret("import org.apache.spark.sql.functions._") - // print empty string otherwise the last statement's output of this method - // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code - scalaInterpret("print(\"\")") } - private def initAndSendSparkWebUrl(): Unit = { + protected def initAndSendSparkWebUrl(): Unit = { val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); if (!StringUtils.isBlank(webUiUrl)) { this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId); @@ -306,7 +243,6 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) - z = new SparkZeppelinContext(sc, sparkShims, interpreterGroup.getInterpreterHookRegistry, properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) @@ -370,45 +306,6 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, method.invoke(obj, parameters: _ *) } - protected def startHttpServer(outputDir: File): Option[(Object, String)] = { - try { - val httpServerClass = Class.forName("org.apache.spark.HttpServer") - val securityManager = { - val constructor = Class.forName("org.apache.spark.SecurityManager") - .getConstructor(classOf[SparkConf]) - constructor.setAccessible(true) - constructor.newInstance(conf).asInstanceOf[Object] - } - val httpServerConstructor = httpServerClass - .getConstructor(classOf[SparkConf], - classOf[File], - Class.forName("org.apache.spark.SecurityManager"), - classOf[Int], - classOf[String]) - httpServerConstructor.setAccessible(true) - // Create Http Server - val port = conf.getInt("spark.replClassServer.port", 0) - val server = httpServerConstructor - .newInstance(conf, outputDir, securityManager, new Integer(port), "HTTP server") - .asInstanceOf[Object] - - // Start Http Server - val startMethod = server.getClass.getMethod("start") - startMethod.setAccessible(true) - startMethod.invoke(server) - - // Get uri of this Http Server - val uriMethod = server.getClass.getMethod("uri") - uriMethod.setAccessible(true) - val uri = uriMethod.invoke(server).asInstanceOf[String] - Some((server, uri)) - } catch { - // Spark 2.0+ removed HttpServer, so return null instead. - case NonFatal(e) => - None - } - } - protected def getUserJars(): Seq[String] = { var classLoader = Thread.currentThread().getContextClassLoader var extraJars = Seq.empty[String] @@ -435,7 +332,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } protected def getUserFiles(): Seq[String] = { - depFiles.asScala.filter(!_.endsWith(".jar")) + depFiles.asScala.toSeq.filter(!_.endsWith(".jar")) } } diff --git a/zeppelin-interpreter-parent/pom.xml b/zeppelin-interpreter-parent/pom.xml index 572a70a..da1e438 100644 --- a/zeppelin-interpreter-parent/pom.xml +++ b/zeppelin-interpreter-parent/pom.xml @@ -39,10 +39,20 @@ </dependency> <dependency> - <groupId>${project.groupId}</groupId> + <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-interpreter</artifactId> <version>${project.version}</version> <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>io.atomix</groupId> + <artifactId>atomix</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> diff --git a/zeppelin-jupyter-interpreter-shaded/pom.xml b/zeppelin-jupyter-interpreter-shaded/pom.xml index da68cca..44ee932 100644 --- a/zeppelin-jupyter-interpreter-shaded/pom.xml +++ b/zeppelin-jupyter-interpreter-shaded/pom.xml @@ -79,6 +79,14 @@ <pattern>com.google</pattern> <shadedPattern>org.apache.zeppelin.jupyter.com.google</shadedPattern> </relocation> + <relocation> + <pattern>io</pattern> + <shadedPattern>org.apache.zeppelin.jupyter.io</shadedPattern> + </relocation> + <relocation> + <pattern>com.esotericsoftware</pattern> + <shadedPattern>org.apache.zeppelin.jupyter.com.esotericsoftware</shadedPattern> + </relocation> </relocations> </configuration> diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 4321052..85b5fc6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -132,6 +132,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { String sparkHome = getEnv("SPARK_HOME"); LOGGER.info("SPARK_HOME: {}", sparkHome); scalaVersion = detectSparkScalaVersion(sparkHome, env); + LOGGER.info("Scala version for Spark: {}", scalaVersion); context.getProperties().put("zeppelin.spark.scala.version", scalaVersion); } catch (Exception e) { throw new IOException("Fail to detect scala version, the reason is:"+ e.getMessage()); @@ -277,12 +278,12 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { Matcher matcher = pattern.matcher(processOutput); if (matcher.find()) { String scalaVersion = matcher.group(1); - if (scalaVersion.startsWith("2.10")) { - return "2.10"; - } else if (scalaVersion.startsWith("2.11")) { + if (scalaVersion.startsWith("2.11")) { return "2.11"; } else if (scalaVersion.startsWith("2.12")) { return "2.12"; + } else if (scalaVersion.startsWith("2.13")) { + return "2.13"; } else { throw new Exception("Unsupported scala version: " + scalaVersion); }