This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 8ed7bff3eb [ZEPPELIN-5946] Drop Spark 2.4,3.0,3.1 Support (#4640) 8ed7bff3eb is described below commit 8ed7bff3ebfd05d571d586eec25d0dbf44f40f40 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Mon Aug 21 08:37:17 2023 +0200 [ZEPPELIN-5946] Drop Spark 2.4,3.0,3.1 Support (#4640) --- .github/workflows/core.yml | 39 +-- .github/workflows/frontend.yml | 10 +- .../zeppelin/img/docs-img/spark_SPARK_HOME16.png | Bin 123514 -> 0 bytes .../zeppelin/img/docs-img/spark_SPARK_HOME24.png | Bin 122833 -> 0 bytes .../zeppelin/img/docs-img/spark_deprecate.png | Bin 176802 -> 0 bytes docs/interpreter/spark.md | 32 +- pom.xml | 3 +- .../spark/submit/SparkSubmitInterpreter.java | 10 +- spark/interpreter/pom.xml | 67 +---- spark/pom.xml | 6 +- spark/scala-2.11/pom.xml | 58 ---- .../scala-2.11/src/main/resources/log4j.properties | 50 ---- .../zeppelin/spark/SparkScala211Interpreter.scala | 325 --------------------- .../zeppelin/spark/SparkZeppelinContext.scala | 243 --------------- spark/spark-scala-parent/pom.xml | 6 +- spark/spark2-shims/pom.xml | 79 ----- .../org/apache/zeppelin/spark/Spark2Shims.java | 140 --------- zeppelin-interpreter-integration/pom.xml | 77 +---- .../zeppelin/integration/SparkIntegrationTest.java | 10 +- .../integration/SparkIntegrationTest24.java | 51 ---- .../integration/SparkIntegrationTest30.java | 57 ---- .../integration/SparkIntegrationTest31.java | 57 ---- .../integration/SparkSubmitIntegrationTest.java | 16 +- .../integration/ZSessionIntegrationTest.java | 12 +- .../integration/ZeppelinSparkClusterTest.java | 129 +++----- .../integration/ZeppelinSparkClusterTest24.java | 39 --- .../integration/ZeppelinSparkClusterTest30.java | 40 --- .../integration/ZeppelinSparkClusterTest31.java | 40 --- .../launcher/SparkInterpreterLauncherTest.java | 11 +- 29 files changed, 98 insertions(+), 1509 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index dc40988c5c..77fa3780e7 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -65,7 +65,7 @@ jobs: restore-keys: | ${{ runner.os }}-zeppelin- - name: install application with some interpreter - run: ./mvnw install -Pbuild-distr -DskipTests -pl zeppelin-server,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,angular,shell -am -Phelium-dev -Pexamples -P${{ matrix.hadoop }} ${MAVEN_ARGS} + run: ./mvnw install -Pbuild-distr -DskipTests -pl zeppelin-server,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,angular,shell -am -Phelium-dev -Pexamples -P${{ matrix.hadoop }} ${MAVEN_ARGS} - name: install and test plugins run: ./mvnw package -pl zeppelin-plugins -amd ${MAVEN_ARGS} - name: Setup conda environment with python 3.7 and R @@ -85,7 +85,7 @@ jobs: conda list conda info - name: run tests with ${{ matrix.hadoop }} # skip spark test because we would run them in other CI - run: ./mvnw verify -Pusing-packaged-distr -pl zeppelin-server,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,angular,shell -am -Phelium-dev -Pexamples -P${{ matrix.hadoop }} -Dtests.to.exclude=**/org/apache/zeppelin/spark/* -DfailIfNoTests=false + run: ./mvnw verify -Pusing-packaged-distr -pl zeppelin-server,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,angular,shell -am -Phelium-dev -Pexamples -P${{ matrix.hadoop }} -Dtests.to.exclude=**/org/apache/zeppelin/spark/* -DfailIfNoTests=false # test interpreter modules except spark, flink, python, rlang, jupyter interpreter-test-non-core: @@ -216,7 +216,7 @@ jobs: ${{ runner.os }}-zeppelin- - name: install environment run: | - ./mvnw install -DskipTests -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am -Pflink-114 ${MAVEN_ARGS} + ./mvnw install -DskipTests -Phadoop2 -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.11,flink/flink-scala-2.12,jdbc,shell -am -Pflink-114 ${MAVEN_ARGS} ./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS} - name: Setup conda environment with python 3.7 and R uses: conda-incubator/setup-miniconda@v2 @@ -302,7 +302,7 @@ jobs: strategy: fail-fast: false matrix: - hadoop: [ 2, 3 ] + hadoop: [ 3 ] java: [ 8, 11 ] steps: - name: Checkout @@ -327,7 +327,7 @@ jobs: ${{ runner.os }}-zeppelin- - name: install environment run: | - ./mvnw install -DskipTests -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13,markdown -am -Phadoop2 -Pintegration ${MAVEN_ARGS} + ./mvnw install -DskipTests -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown -am -Phadoop2 -Pintegration ${MAVEN_ARGS} ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS} - name: Setup conda environment with python 3.7 and R uses: conda-incubator/setup-miniconda@v2 @@ -344,10 +344,7 @@ jobs: run: | R -e "IRkernel::installspec()" - name: run tests on hadoop${{ matrix.hadoop }} - run: ./mvnw test -pl zeppelin-interpreter-integration -Phadoop${{ matrix.hadoop }} -Pintegration -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32,ZeppelinSparkClusterTest33,SparkIntegrationTest33 -DfailIfNoTests=false ${MAVEN_ARGS} - - name: run Spark 2.4 tests on hadoop${{ matrix.hadoop }} - run: ./mvnw test -pl zeppelin-interpreter-integration -Phadoop${{ matrix.hadoop }} -Pintegration -Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24 -DfailIfNoTests=false ${MAVEN_ARGS} - if: matrix.matrix.java == 8 # Spark 2.4 doesn't support JDK 11 + run: ./mvnw test -pl zeppelin-interpreter-integration -Phadoop${{ matrix.hadoop }} -Pintegration -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest32,SparkIntegrationTest32,ZeppelinSparkClusterTest33,SparkIntegrationTest33 -DfailIfNoTests=false ${MAVEN_ARGS} # test on spark for each spark version & scala version spark-test: @@ -379,7 +376,7 @@ jobs: restore-keys: | ${{ runner.os }}-zeppelin- - name: install environment - run: ./mvnw install -DskipTests -pl spark-submit,spark/scala-2.11,spark/scala-2.12,spark/scala-2.13 -am -Phadoop2 ${MAVEN_ARGS} + run: ./mvnw install -DskipTests -pl spark-submit,spark/scala-2.12,spark/scala-2.13 -am -Phadoop2 ${MAVEN_ARGS} - name: Setup conda environment with python ${{ matrix.python }} and R uses: conda-incubator/setup-miniconda@v2 with: @@ -394,24 +391,6 @@ jobs: - name: Make IRkernel available to Jupyter run: | R -e "IRkernel::installspec()" - - name: run spark-2.4 tests with scala-2.11 and python-${{ matrix.python }} - if: matrix.python == '3.7 && matrix.java == 8' # Spark 2.4 doesn't support python 3.8 and JDK 8 - run: | - rm -rf spark/interpreter/metastore_db - ./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-2.4 -Pspark-scala-2.11 -DfailIfNoTests=false ${MAVEN_ARGS} - - name: run spark-2.4 tests with scala-2.12 and python-${{ matrix.python }} - if: matrix.python == '3.7 && matrix.java == 8' # Spark 2.4 doesn't support python 3.8 and JDK 8 - run: | - rm -rf spark/interpreter/metastore_db - ./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-2.4 -Pspark-scala-2.12 -Phadoop2 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS} - - name: run spark-3.0 tests with scala-2.12 and python-${{ matrix.python }} - run: | - rm -rf spark/interpreter/metastore_db - ./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-3.0 -Pspark-scala-2.12 -Phadoop2 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS} - - name: run spark-3.1 tests with scala-2.12 and python-${{ matrix.python }} - run: | - rm -rf spark/interpreter/metastore_db - ./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-3.1 -Pspark-scala-2.12 -Phadoop2 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS} - name: run spark-3.2 tests with scala-2.12 and python-${{ matrix.python }} run: | rm -rf spark/interpreter/metastore_db @@ -433,7 +412,7 @@ jobs: rm -rf spark/interpreter/metastore_db ./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-3.4 -Pspark-scala-2.13 -Phadoop3 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS} - livy-0-7-with-spark-2-2-0-under-python3: + livy-0-7-with-spark-3-4-1-under-python3: runs-on: ubuntu-20.04 steps: - name: Checkout @@ -459,7 +438,7 @@ jobs: - name: install environment run: | ./mvnw install -DskipTests -pl livy -am ${MAVEN_ARGS} - ./testing/downloadSpark.sh "2.2.0" "2.6" + ./testing/downloadSpark.sh "3.4.1" "3" ./testing/downloadLivy.sh "0.7.1-incubating" - name: Setup conda environment with python 3.7 and R uses: conda-incubator/setup-miniconda@v2 diff --git a/.github/workflows/frontend.yml b/.github/workflows/frontend.yml index 32855dfef8..56a2378ffb 100644 --- a/.github/workflows/frontend.yml +++ b/.github/workflows/frontend.yml @@ -53,9 +53,9 @@ jobs: restore-keys: | ${{ runner.os }}-zeppelin- - name: Install application - run: ./mvnw clean install -DskipTests -am -pl zeppelin-web -Pscala-2.11 -Pspark-scala-2.11 -Pspark-2.4 -Phadoop2 -Pweb-dist ${MAVEN_ARGS} + run: ./mvnw clean install -DskipTests -am -pl zeppelin-web -Pscala-2.11 -Pspark-scala-2.12 -Pspark-3.4 -Phadoop2 -Pweb-dist ${MAVEN_ARGS} - name: Run headless test - run: xvfb-run --auto-servernum --server-args="-screen 0 1024x768x24" ./mvnw verify -pl zeppelin-web -Pscala-2.11 -Pspark-scala-2.11 -Pspark-2.4 -Phadoop2 -Pweb-dist -Pweb-e2e ${MAVEN_ARGS} + run: xvfb-run --auto-servernum --server-args="-screen 0 1024x768x24" ./mvnw verify -pl zeppelin-web -Pscala-2.12 -Pspark-scala-2.12 -Pspark-3.4 -Phadoop2 -Pweb-dist -Pweb-e2e ${MAVEN_ARGS} - name: Print zeppelin logs if: always() run: if [ -d "logs" ]; then cat logs/*; fi @@ -86,7 +86,7 @@ jobs: - name: Run headless test run: xvfb-run --auto-servernum --server-args="-screen 0 1024x768x24" ./mvnw package -pl zeppelin-web-angular -Pweb-angular ${MAVEN_ARGS} - test-selenium-with-spark-module-for-spark-2-4: + test-selenium-with-spark-module-for-spark-3-4: runs-on: ubuntu-20.04 defaults: run: @@ -128,10 +128,10 @@ jobs: R -e "IRkernel::installspec()" - name: Install Environment run: | - ./mvnw clean install -DskipTests -am -pl zeppelin-integration -Pintegration -Pspark-scala-2.11 -Pspark-2.4 -Phadoop2 -Pweb-dist ${MAVEN_ARGS} + ./mvnw clean install -DskipTests -am -pl zeppelin-integration -Pintegration -Pspark-scala-2.12 -Pspark-3.4 -Phadoop2 -Pweb-dist ${MAVEN_ARGS} - name: run tests run: | - source ./testing/downloadSpark.sh "2.4.7" "2.7" && echo "SPARK_HOME: ${SPARK_HOME}" && xvfb-run --auto-servernum --server-args="-screen 0 1600x1024x16" ./mvnw verify -DfailIfNoTests=false -pl zeppelin-integration -Pintegration -Pspark-scala-2.11 -Pspark-2.4 -Phadoop2 -Pweb-dist -Pusing-source-tree ${MAVEN_ARGS} + source ./testing/downloadSpark.sh "3.4.1" "3" && echo "SPARK_HOME: ${SPARK_HOME}" && xvfb-run --auto-servernum --server-args="-screen 0 1600x1024x16" ./mvnw verify -DfailIfNoTests=false -pl zeppelin-integration -Pintegration -Pspark-scala-2.12 -Pspark-3.4 -Phadoop2 -Pweb-dist -Pusing-source-tree ${MAVEN_ARGS} - name: Print zeppelin logs if: always() run: if [ -d "logs" ]; then cat logs/*; fi diff --git a/docs/assets/themes/zeppelin/img/docs-img/spark_SPARK_HOME16.png b/docs/assets/themes/zeppelin/img/docs-img/spark_SPARK_HOME16.png deleted file mode 100644 index f925d47c17..0000000000 Binary files a/docs/assets/themes/zeppelin/img/docs-img/spark_SPARK_HOME16.png and /dev/null differ diff --git a/docs/assets/themes/zeppelin/img/docs-img/spark_SPARK_HOME24.png b/docs/assets/themes/zeppelin/img/docs-img/spark_SPARK_HOME24.png deleted file mode 100644 index 0eaa063d60..0000000000 Binary files a/docs/assets/themes/zeppelin/img/docs-img/spark_SPARK_HOME24.png and /dev/null differ diff --git a/docs/assets/themes/zeppelin/img/docs-img/spark_deprecate.png b/docs/assets/themes/zeppelin/img/docs-img/spark_deprecate.png deleted file mode 100644 index 8a867ccecb..0000000000 Binary files a/docs/assets/themes/zeppelin/img/docs-img/spark_deprecate.png and /dev/null differ diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md index a5c78aa09e..561f76d3c7 100644 --- a/docs/interpreter/spark.md +++ b/docs/interpreter/spark.md @@ -89,7 +89,7 @@ Apache Spark is supported in Zeppelin with Spark interpreter group which consist </tr> <tr> <td>Support multiple versions of Scala</td> - <td>You can run different Scala versions (2.10/2.11/2.12) of Spark in on Zeppelin instance</td> + <td>You can run different Scala versions (2.12/2.13) of Spark in on Zeppelin instance</td> </tr> <tr> <td>Support multiple languages</td> @@ -284,16 +284,6 @@ You can also set other Spark properties which are not listed in the table. For a <td>false</td> <td>whether use yarn proxy url as Spark weburl, e.g. http://localhost:8088/proxy/application_1583396598068_0004</td> </tr> - <tr> - <td>spark.repl.target</td> - <td>jvm-1.6</td> - <td> - Manually specifying the Java version of Spark Interpreter Scala REPL,Available options:<br/> - scala-compile v2.10.7 to v2.11.12 supports "jvm-1.5, jvm-1.6, jvm-1.7 and jvm-1.8", and the default value is jvm-1.6.<br/> - scala-compile v2.10.1 to v2.10.6 supports "jvm-1.5, jvm-1.6, jvm-1.7", and the default value is jvm-1.6.<br/> - scala-compile v2.12.x defaults to jvm-1.8, and only supports jvm-1.8. - </td> - </tr> </table> Without any configuration, Spark interpreter works out of box in local mode. But if you want to connect to your Spark cluster, you'll need to follow below two simple steps. @@ -332,15 +322,8 @@ export HADOOP_CONF_DIR=/usr/lib/hadoop #### Set `SPARK_HOME` in interpreter setting page If you want to use multiple versions of Spark, then you need to create multiple Spark interpreters and set `SPARK_HOME` separately. e.g. -Create a new Spark interpreter `spark24` for Spark 2.4 and set its `SPARK_HOME` in interpreter setting page as following, -<center> -<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/spark_SPARK_HOME24.png" width="80%"> -</center> - -Create a new Spark interpreter `spark16` for Spark 1.6 and set its `SPARK_HOME` in interpreter setting page as following, -<center> -<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/spark_SPARK_HOME16.png" width="80%"> -</center> +Create a new Spark interpreter `spark33` for Spark 3.3 and set its `SPARK_HOME` in interpreter setting page, +Create a new Spark interpreter `spark34` for Spark 3.4 and set its `SPARK_HOME` in interpreter setting page. #### Set `SPARK_HOME` via [inline generic configuration](../usage/interpreter/overview.html#inline-generic-confinterpreter) @@ -628,15 +611,6 @@ you need to enable user impersonation for more security control. In order the en **Step 3(Optional)** If you are using kerberos cluster, then you need to set `zeppelin.server.kerberos.keytab` and `zeppelin.server.kerberos.principal` to the user(aka. user in Step 1) you want to impersonate in `zeppelin-site.xml`. - - -## Deprecate Spark 2.2 and earlier versions -Starting from 0.9, Zeppelin deprecate Spark 2.2 and earlier versions. So you will see a warning message when you use Spark 2.2 and earlier. -You can get rid of this message by setting `zeppelin.spark.deprecatedMsg.show` to `false`. - -<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/spark_deprecate.png"> - - ## Community [Join our community](http://zeppelin.apache.org/community.html) to discuss with others. diff --git a/pom.xml b/pom.xml index 621aa9d5fa..302d2d730d 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,8 @@ <hadoop2.7.version>2.7.7</hadoop2.7.version> <hadoop3.0.version>3.0.3</hadoop3.0.version> <hadoop3.1.version>3.1.3</hadoop3.1.version> - <hadoop3.2.version>3.2.0</hadoop3.2.version> + <hadoop3.2.version>3.2.4</hadoop3.2.version> + <hadoop3.3.version>3.3.6</hadoop3.3.version> <hadoop.version>${hadoop2.7.version}</hadoop.version> <hadoop.deps.scope>provided</hadoop.deps.scope> <hadoop-client-api.artifact>hadoop-client</hadoop-client-api.artifact> diff --git a/spark-submit/src/main/java/org/apache/zeppelin/spark/submit/SparkSubmitInterpreter.java b/spark-submit/src/main/java/org/apache/zeppelin/spark/submit/SparkSubmitInterpreter.java index 61a1c95c6c..53232ed20b 100644 --- a/spark-submit/src/main/java/org/apache/zeppelin/spark/submit/SparkSubmitInterpreter.java +++ b/spark-submit/src/main/java/org/apache/zeppelin/spark/submit/SparkSubmitInterpreter.java @@ -48,14 +48,14 @@ public class SparkSubmitInterpreter extends ShellInterpreter { private String sparkHome; // paragraphId --> yarnAppId - private ConcurrentMap<String, String> yarnAppIdMap = new ConcurrentHashMap(); + private ConcurrentMap<String, String> yarnAppIdMap = new ConcurrentHashMap<>(); public SparkSubmitInterpreter(Properties property) { super(property); // Set time to be max integer so that the shell process won't timeout. setProperty("shell.command.timeout.millisecs", Integer.MAX_VALUE + ""); this.sparkHome = properties.getProperty("SPARK_HOME"); - LOGGER.info("SPARK_HOME: " + sparkHome); + LOGGER.info("SPARK_HOME: {}", sparkHome); } @Override @@ -64,7 +64,7 @@ public class SparkSubmitInterpreter extends ShellInterpreter { return new InterpreterResult(InterpreterResult.Code.SUCCESS); } String sparkSubmitCommand = sparkHome + "/bin/spark-submit " + cmd.trim(); - LOGGER.info("Run spark command: " + sparkSubmitCommand); + LOGGER.info("Run spark command: {}", sparkSubmitCommand); context.out.addInterpreterOutListener(new SparkSubmitOutputListener(context)); InterpreterResult result = super.internalInterpret(sparkSubmitCommand, context); yarnAppIdMap.remove(context.getParagraphId()); @@ -142,10 +142,10 @@ public class SparkSubmitInterpreter extends ShellInterpreter { LOGGER.info("Detected yarn app: {}", yarnAppId); SparkSubmitInterpreter.this.yarnAppIdMap.put(context.getParagraphId(), yarnAppId); } else { - LOGGER.warn("Might be an invalid spark URL: " + sparkUI); + LOGGER.warn("Might be an invalid spark URL: {}", sparkUI); } } else { - LOGGER.error("Unable to extract spark url from this log: " + log); + LOGGER.error("Unable to extract spark url from this log: {}", log); } } diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index 4858b884ae..322b4bac8f 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -40,14 +40,14 @@ <maven.aeither.provider.version>3.0.3</maven.aeither.provider.version> <wagon.version>2.7</wagon.version> - <datanucleus.rdbms.version>3.2.9</datanucleus.rdbms.version> - <datanucleus.apijdo.version>3.2.6</datanucleus.apijdo.version> - <datanucleus.core.version>3.2.10</datanucleus.core.version> + <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> + <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> + <datanucleus.core.version>4.1.17</datanucleus.core.version> <!-- spark versions --> - <spark.version>3.1.2</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.9</py4j.version> + <spark.version>3.4.1</spark.version> + <protobuf.version>3.21.12</protobuf.version> + <py4j.version>0.10.9.7</py4j.version> <spark.scala.version>2.12.7</spark.scala.version> <spark.scala.binary.version>2.12</spark.scala.binary.version> @@ -79,12 +79,6 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>spark2-shims</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.zeppelin</groupId> <artifactId>spark3-shims</artifactId> @@ -516,14 +510,6 @@ </properties> </profile> - <profile> - <id>spark-scala-2.11</id> - <properties> - <spark.scala.version>2.11.12</spark.scala.version> - <spark.scala.binary.version>2.11</spark.scala.binary.version> - </properties> - </profile> - <!-- profile spark-x only affect spark version used in test --> <profile> <id>spark-3.4</id> @@ -534,7 +520,7 @@ <datanucleus.core.version>4.1.17</datanucleus.core.version> <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> - <spark.version>3.4.0</spark.version> + <spark.version>3.4.1</spark.version> <protobuf.version>3.21.12</protobuf.version> <py4j.version>0.10.9.7</py4j.version> </properties> @@ -546,7 +532,7 @@ <datanucleus.core.version>4.1.17</datanucleus.core.version> <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> - <spark.version>3.3.0</spark.version> + <spark.version>3.3.2</spark.version> <protobuf.version>2.5.0</protobuf.version> <py4j.version>0.10.9.5</py4j.version> </properties> @@ -558,42 +544,9 @@ <datanucleus.core.version>4.1.17</datanucleus.core.version> <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> - <spark.version>3.2.0</spark.version> + <spark.version>3.2.4</spark.version> <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.9.2</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-3.1</id> - <properties> - <datanucleus.core.version>4.1.17</datanucleus.core.version> - <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> - <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> - <spark.version>3.1.2</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.9</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-3.0</id> - <properties> - <datanucleus.core.version>4.1.17</datanucleus.core.version> - <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> - <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> - <spark.version>3.0.3</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.9</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-2.4</id> - <properties> - <spark.version>2.4.5</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.7</py4j.version> + <py4j.version>0.10.9.5</py4j.version> </properties> </profile> diff --git a/spark/pom.xml b/spark/pom.xml index c2f8b06197..7985b41261 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -38,10 +38,10 @@ <datanucleus.core.version>3.2.10</datanucleus.core.version> <!-- spark versions --> - <spark.version>3.1.2</spark.version> + <spark.version>3.4.1</spark.version> <protobuf.version>2.5.0</protobuf.version> <py4j.version>0.10.9</py4j.version> - <spark.scala.version>2.12.7</spark.scala.version> + <spark.scala.version>2.12.18</spark.scala.version> <spark.scala.binary.version>2.12</spark.scala.binary.version> <scala.compile.version>${spark.scala.version}</scala.compile.version> @@ -58,11 +58,9 @@ <modules> <module>interpreter</module> <module>spark-scala-parent</module> - <module>scala-2.11</module> <module>scala-2.12</module> <module>scala-2.13</module> <module>spark-shims</module> - <module>spark2-shims</module> <module>spark3-shims</module> </modules> diff --git a/spark/scala-2.11/pom.xml b/spark/scala-2.11/pom.xml deleted file mode 100644 index 5251ecc8d9..0000000000 --- a/spark/scala-2.11/pom.xml +++ /dev/null @@ -1,58 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>org.apache.zeppelin</groupId> - <artifactId>spark-scala-parent</artifactId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../spark-scala-parent/pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>spark-scala-2.11</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Spark Interpreter Scala_2.11</name> - - <properties> - <spark.version>2.4.5</spark.version> - <spark.scala.version>2.11.12</spark.scala.version> - <spark.scala.binary.version>2.11</spark.scala.binary.version> - <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version> - </properties> - - <build> - <plugins> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - </plugin> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/spark/scala-2.11/src/main/resources/log4j.properties b/spark/scala-2.11/src/main/resources/log4j.properties deleted file mode 100644 index 0c90b21ae0..0000000000 --- a/spark/scala-2.11/src/main/resources/log4j.properties +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Direct log messages to stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n -#log4j.appender.stdout.layout.ConversionPattern= -#%5p [%t] (%F:%L) - %m%n -#%-4r [%t] %-5p %c %x - %m%n -# - -# Root logger option -log4j.rootLogger=INFO, stdout - -#mute some noisy guys -log4j.logger.org.apache.hadoop.mapred=WARN -log4j.logger.org.apache.hadoop.hive.ql=WARN -log4j.logger.org.apache.hadoop.hive.metastore=WARN -log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN -log4j.logger.org.apache.zeppelin.scheduler=WARN - -log4j.logger.org.quartz=WARN -log4j.logger.DataNucleus=WARN -log4j.logger.DataNucleus.MetaData=ERROR -log4j.logger.DataNucleus.Datastore=ERROR - -# Log all JDBC parameters -log4j.logger.org.hibernate.type=ALL - -log4j.logger.org.apache.zeppelin.interpreter=DEBUG -log4j.logger.org.apache.zeppelin.spark=DEBUG - - -log4j.logger.org.apache.spark.repl.Main=INFO diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala deleted file mode 100644 index bb38c71a87..0000000000 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import org.apache.spark.SparkConf -import org.apache.spark.repl.SparkILoop -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult} -import org.apache.zeppelin.kotlin.KotlinInterpreter -import org.slf4j.{Logger, LoggerFactory} - -import java.io.{BufferedReader, File, PrintStream} -import java.net.URLClassLoader -import java.nio.file.Paths -import java.util.Properties -import scala.collection.JavaConverters._ -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter._ - - -/** - * SparkInterpreter for scala-2.11. - * It only works for Spark 2.x, as Spark 3.x doesn't support scala-2.11 - */ -class SparkScala211Interpreter(conf: SparkConf, - depFiles: java.util.List[String], - properties: Properties, - interpreterGroup: InterpreterGroup, - sparkInterpreterClassLoader: URLClassLoader, - outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) { - - private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - - private var sparkILoop: SparkILoop = _ - private var scalaCompletion: Completion = _ - private val interpreterOutput = new InterpreterOutputStream(LOGGER) - private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME, - SparkStringConstants.DEFAULT_MASTER_VALUE) - - override def interpret(code: String, context: InterpreterContext): InterpreterResult = { - - val originalOut = System.out - val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean - - def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = { - Console.withOut(interpreterOutput) { - System.setOut(Console.out) - if (printREPLOutput) { - interpreterOutput.setInterpreterOutput(context.out) - } else { - interpreterOutput.setInterpreterOutput(null) - } - interpreterOutput.ignoreLeadingNewLinesFromScalaReporter() - - val status = scalaInterpret(code) match { - case succ...@scala.tools.nsc.interpreter.ir.Success => - success - case scala.tools.nsc.interpreter.IR.Error => - val errorMsg = new String(interpreterOutput.getInterpreterOutput.toByteArray) - if (errorMsg.contains("value toDF is not a member of org.apache.spark.rdd.RDD") || - errorMsg.contains("value toDS is not a member of org.apache.spark.rdd.RDD")) { - // prepend "import sqlContext.implicits._" due to - // https://issues.scala-lang.org/browse/SI-6649 - context.out.clear() - scalaInterpret("import sqlContext.implicits._\n" + code) - } else { - scala.tools.nsc.interpreter.IR.Error - } - case scala.tools.nsc.interpreter.IR.Incomplete => - // add print("") at the end in case the last line is comment which lead to INCOMPLETE - scalaInterpret(code + "\nprint(\"\")") - } - context.out.flush() - status - } - } - // reset the java stdout - System.setOut(originalOut) - - context.out.write("") - val lastStatus = _interpret(code) match { - case scala.tools.nsc.interpreter.IR.Success => - InterpreterResult.Code.SUCCESS - case scala.tools.nsc.interpreter.IR.Error => - InterpreterResult.Code.ERROR - case scala.tools.nsc.interpreter.IR.Incomplete => - InterpreterResult.Code.INCOMPLETE - } - - lastStatus match { - case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression") - case _ => new InterpreterResult(lastStatus) - } - } - - override def completion(buf: String, - cursor: Int, - context: InterpreterContext): java.util.List[InterpreterCompletion] = { - scalaCompletion.completer().complete(buf.substring(0, cursor), cursor) - .candidates - .map(e => new InterpreterCompletion(e, e, null)) - .asJava - } - - private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { - sparkILoop.beQuietDuring { - val result = sparkILoop.bind(name, tpe, value, modifier) - if (result != IR.Success) { - throw new RuntimeException("Fail to bind variable: " + name) - } - } - } - - override def bind(name: String, - tpe: String, - value: Object, - modifier: java.util.List[String]): Unit = - bind(name, tpe, value, modifier.asScala.toList) - - private def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = - sparkILoop.interpret(code) - - @throws[InterpreterException] - def scalaInterpretQuietly(code: String): Unit = { - scalaInterpret(code) match { - case scala.tools.nsc.interpreter.Results.Success => - // do nothing - case scala.tools.nsc.interpreter.Results.Error => - throw new InterpreterException("Fail to run code: " + code) - case scala.tools.nsc.interpreter.Results.Incomplete => - throw new InterpreterException("Incomplete code: " + code) - } - } - - override def getScalaShellClassLoader: ClassLoader = { - sparkILoop.classLoader - } - - // Used by KotlinSparkInterpreter - override def delegateInterpret(interpreter: KotlinInterpreter, - code: String, - context: InterpreterContext): InterpreterResult = { - val out = context.out - val newOut = if (out != null) new PrintStream(out) else null - Console.withOut(newOut) { - interpreter.interpret(code, context) - } - } - - override def close(): Unit = { - super.close() - if (sparkILoop != null) { - sparkILoop.closeInterpreter() - } - } - - override def createSparkILoop(): Unit = { - if (sparkMaster == "yarn-client") { - System.setProperty("SPARK_YARN_MODE", "true") - } - - LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) - conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - val target = conf.get("spark.repl.target", "jvm-1.6") - val settings = new Settings() - settings.processArguments(List("-Yrepl-class-based", - "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) - settings.embeddedDefaults(sparkInterpreterClassLoader) - settings.usejavacp.value = true - settings.target.value = target - val userJars = getUserJars() - LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) - settings.classpath.value = userJars.mkString(File.pathSeparator) - - val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean - val replOut = if (printReplOutput) { - new JPrintWriter(interpreterOutput, true) - } else { - new JPrintWriter(Console.out, true) - } - sparkILoop = new SparkILoop(None, replOut) - sparkILoop.settings = settings - sparkILoop.createInterpreter() - - val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]] - val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true)) - - sparkILoop.in = reader - sparkILoop.initializeSynchronous() - SparkScala211Interpreter.loopPostInit(this) - this.scalaCompletion = reader.completion - } - - override def createZeppelinContext(): Unit = { - val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession) - sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) - z = new SparkZeppelinContext(sc, sparkShims, - interpreterGroup.getInterpreterHookRegistry, - properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) - bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) - } - - private def getField(obj: Object, name: String): Object = { - val field = obj.getClass.getField(name) - field.setAccessible(true) - field.get(obj) - } - - private def callMethod(obj: Object, name: String, - parameterTypes: Array[Class[_]], - parameters: Array[Object]): Object = { - val method = obj.getClass.getMethod(name, parameterTypes: _ *) - method.setAccessible(true) - method.invoke(obj, parameters: _ *) - } - - private def getUserJars(): Seq[String] = { - var classLoader = Thread.currentThread().getContextClassLoader - var extraJars = Seq.empty[String] - while (classLoader != null) { - if (classLoader.getClass.getCanonicalName == - "org.apache.spark.util.MutableURLClassLoader") { - extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs() - // Check if the file exists. - .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } - // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. - .filterNot { - u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") - } - .map(url => url.toString).toSeq - classLoader = null - } else { - classLoader = classLoader.getParent - } - } - - extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath()) - LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) - extraJars - } -} - -private object SparkScala211Interpreter { - - /** - * This is a hack to call `loopPostInit` at `ILoop`. At higher version of Scala such - * as 2.11.12, `loopPostInit` became a nested function which is inaccessible. Here, - * we redefine `loopPostInit` at Scala's 2.11.8 side and ignore `loadInitFiles` being called at - * Scala 2.11.12 since here we do not have to load files. - * - * Both methods `loopPostInit` and `unleashAndSetPhase` are redefined, and `phaseCommand` and - * `asyncMessage` are being called via reflection since both exist in Scala 2.11.8 and 2.11.12. - * - * Please see the codes below: - * https://github.com/scala/scala/blob/v2.11.8/src/repl/scala/tools/nsc/interpreter/ILoop.scala - * https://github.com/scala/scala/blob/v2.11.12/src/repl/scala/tools/nsc/interpreter/ILoop.scala - * - * See also ZEPPELIN-3810. - */ - private def loopPostInit(interpreter: SparkScala211Interpreter): Unit = { - import StdReplTags._ - import scala.reflect.{classTag, io} - - val sparkILoop = interpreter.sparkILoop - val intp = sparkILoop.intp - val power = sparkILoop.power - val in = sparkILoop.in - - def loopPostInit() { - // Bind intp somewhere out of the regular namespace where - // we can get at it in generated code. - intp.quietBind(NamedParam[IMain]("$intp", intp)(tagOfIMain, classTag[IMain])) - // Auto-run code via some setting. - (replProps.replAutorunCode.option - flatMap (f => io.File(f).safeSlurp()) - foreach (intp quietRun _) - ) - // classloader and power mode setup - intp.setContextClassLoader() - if (isReplPower) { - replProps.power setValue true - unleashAndSetPhase() - asyncMessage(power.banner) - } - // SI-7418 Now, and only now, can we enable TAB completion. - in.postInit() - } - - def unleashAndSetPhase() = if (isReplPower) { - power.unleash() - intp beSilentDuring phaseCommand("typer") // Set the phase to "typer" - } - - def phaseCommand(name: String): Results.Result = { - interpreter.callMethod( - sparkILoop, - "scala$tools$nsc$interpreter$ILoop$$phaseCommand", - Array(classOf[String]), - Array(name)).asInstanceOf[Results.Result] - } - - def asyncMessage(msg: String): Unit = { - interpreter.callMethod( - sparkILoop, "asyncMessage", Array(classOf[String]), Array(msg)) - } - - loopPostInit() - } -} diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala deleted file mode 100644 index 410ed4cf54..0000000000 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import java.util - -import org.apache.spark.SparkContext -import org.apache.zeppelin.annotation.ZeppelinApi -import org.apache.zeppelin.display.AngularObjectWatcher -import org.apache.zeppelin.display.ui.OptionInput.ParamOption -import org.apache.zeppelin.interpreter.{ZeppelinContext, InterpreterContext, InterpreterHookRegistry} - -import scala.collection.Seq -import scala.collection.JavaConverters._ - - -/** - * ZeppelinContext for Spark - */ -class SparkZeppelinContext(val sc: SparkContext, - val sparkShims: SparkShims, - val hooks2: InterpreterHookRegistry, - val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) { - - private val interpreterClassMap = Map( - "spark" -> "org.apache.zeppelin.spark.SparkInterpreter", - "sql" -> "org.apache.zeppelin.spark.SparkSqlInterpreter", - "pyspark" -> "org.apache.zeppelin.spark.PySparkInterpreter", - "ipyspark" -> "org.apache.zeppelin.spark.IPySparkInterpreter", - "r" -> "org.apache.zeppelin.spark.SparkRInterpreter", - "kotlin" -> "org.apache.zeppelin.spark.KotlinSparkInterpreter" - ) - - private val supportedClasses = scala.collection.mutable.ArrayBuffer[Class[_]]() - - try { - supportedClasses += Class.forName("org.apache.spark.sql.Dataset") - } catch { - case e: ClassNotFoundException => - } - - try { - supportedClasses += Class.forName("org.apache.spark.sql.DataFrame") - } catch { - case e: ClassNotFoundException => - - } - if (supportedClasses.isEmpty) throw new RuntimeException("Can not load Dataset/DataFrame class") - - override def getSupportedClasses: util.List[Class[_]] = supportedClasses.asJava - - override def getInterpreterClassMap: util.Map[String, String] = interpreterClassMap.asJava - - override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult, interpreterContext) - - /** - * create paragraph level of dynamic form of Select with no item selected. - * - * @param name - * @param options - * @return text value of selected item - */ - @ZeppelinApi - def select(name: String, options: Seq[(Any, String)]): Any = select(name, options, null: Any) - - /** - * create paragraph level of dynamic form of Select with default selected item. - * - * @param name - * @param defaultValue - * @param options - * @return text value of selected item - */ - @Deprecated - @ZeppelinApi - def select(name: String, defaultValue: Any, options: Seq[(Any, String)]): Any = - select(name, options.map(e => new ParamOption(e._1, e._2)).toArray, defaultValue) - - /** - * create paragraph level of dynamic form of Select with default selected item. - * - * @param name - * @param options - * @param defaultValue - * @return text value of selected item - */ - @ZeppelinApi - def select(name: String, options: Seq[(Any, String)], defaultValue: Any): Any = - select(name, options.map(e => new ParamOption(e._1, e._2)).toArray, defaultValue) - - - /** - * create note level of dynamic form of Select with no item selected. - * - * @param name - * @param options - * @return text value of selected item - */ - @ZeppelinApi - def noteSelect(name: String, options: Seq[(Any, String)]): Any = - noteSelect(name, null, options.map(e => new ParamOption(e._1, e._2)).toArray) - - /** - * create note level of dynamic form of Select with default selected item. - * - * @param name - * @param options - * @param defaultValue - * @return text value of selected item - */ - @ZeppelinApi - def noteSelect(name: String, options: Seq[(Any, String)], defaultValue: Any): Any = - noteSelect(name, options.map(e => new ParamOption(e._1, e._2)).toArray, defaultValue) - - /** - * create note level of dynamic form of Select with default selected item. - * - * @param name - * @param defaultValue - * @param options - * @return text value of selected item - */ - @Deprecated - @ZeppelinApi - def noteSelect(name: String, defaultValue: Any, options: Seq[(Any, String)]): Any = - noteSelect(name, options.map(e => new ParamOption(e._1, e._2)).toArray, defaultValue) - - /** - * create paragraph level of dynamic form of checkbox with no item checked. - * - * @param name - * @param options - * @return list of checked values of this checkbox - */ - @ZeppelinApi - def checkbox(name: String, options: Seq[(Any, String)]): Seq[Any] = { - val javaResult = checkbox(name, options.map(e => new ParamOption(e._1, e._2)).toArray) - javaResult.asScala - } - - /** - * create paragraph level of dynamic form of checkbox with default checked items. - * - * @param name - * @param options - * @param defaultChecked - * @return list of checked values of this checkbox - */ - @ZeppelinApi - def checkbox(name: String, options: Seq[(Any, String)], defaultChecked: Seq[Any]): Seq[Any] = { - val defaultCheckedList = defaultChecked.asJava - val optionsArray = options.map(e => new ParamOption(e._1, e._2)).toArray - val javaResult = checkbox(name, optionsArray, defaultCheckedList) - javaResult.asScala - } - - /** - * create note level of dynamic form of checkbox with no item checked. - * - * @param name - * @param options - * @return list of checked values of this checkbox - */ - @ZeppelinApi - def noteCheckbox(name: String, options: Seq[(Any, String)]): Seq[Any] = { - val javaResult = noteCheckbox(name, options.map(e => new ParamOption(e._1, e._2)).toArray) - javaResult.asScala - } - - /** - * create note level of dynamic form of checkbox with default checked items. - * - * @param name - * @param options - * @param defaultChecked - * @return list of checked values of this checkbox - */ - @ZeppelinApi - def noteCheckbox(name: String, options: Seq[(Any, String)], defaultChecked: Seq[Any]): Seq[Any] = { - val javaResult = noteCheckbox(name, - options.map(e => new ParamOption(e._1, e._2)).toArray, defaultChecked.asJava) - javaResult.asScala - } - - @ZeppelinApi - def angularWatch(name: String, func: (AnyRef, AnyRef) => Unit): Unit = { - angularWatch(name, interpreterContext.getNoteId, func) - } - - @deprecated - def angularWatchGlobal(name: String, func: (AnyRef, AnyRef) => Unit): Unit = { - angularWatch(name, null, func) - } - - @ZeppelinApi - def angularWatch(name: String, func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { - angularWatch(name, interpreterContext.getNoteId, func) - } - - @deprecated - def angularWatchGlobal(name: String, - func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { - angularWatch(name, null, func) - } - - private def angularWatch(name: String, noteId: String, func: (AnyRef, AnyRef) => Unit): Unit = { - val w = new AngularObjectWatcher(getInterpreterContext) { - override def watch(oldObject: Any, newObject: AnyRef, context: InterpreterContext): Unit = { - func(newObject, newObject) - } - } - angularWatch(name, noteId, w) - } - - private def angularWatch(name: String, noteId: String, - func: (AnyRef, AnyRef, InterpreterContext) => Unit): Unit = { - val w = new AngularObjectWatcher(getInterpreterContext) { - override def watch(oldObject: AnyRef, newObject: AnyRef, context: InterpreterContext): Unit = { - func(oldObject, newObject, context) - } - } - angularWatch(name, noteId, w) - } - - def getAsDataFrame(name: String): Object = { - sparkShims.getAsDataFrame(get(name).toString) - } -} diff --git a/spark/spark-scala-parent/pom.xml b/spark/spark-scala-parent/pom.xml index efdb890920..8675d9db01 100644 --- a/spark/spark-scala-parent/pom.xml +++ b/spark/spark-scala-parent/pom.xml @@ -32,9 +32,9 @@ <name>Zeppelin: Spark Scala Parent</name> <properties> - <spark.version>2.4.5</spark.version> - <spark.scala.binary.version>2.11</spark.scala.binary.version> - <spark.scala.version>2.11.12</spark.scala.version> + <spark.version>3.4.1</spark.version> + <spark.scala.binary.version>2.12</spark.scala.binary.version> + <spark.scala.version>2.12.18</spark.scala.version> <spark.scala.compile.version>${spark.scala.binary.version}</spark.scala.compile.version> </properties> diff --git a/spark/spark2-shims/pom.xml b/spark/spark2-shims/pom.xml deleted file mode 100644 index 03071d2458..0000000000 --- a/spark/spark2-shims/pom.xml +++ /dev/null @@ -1,79 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>spark-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>spark2-shims</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Spark2 Shims</name> - - <properties> - <scala.binary.version>2.11</scala.binary.version> - <spark.version>2.3.2</spark.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>spark-shims</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-interpreter-setting</id> - <phase>none</phase> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> diff --git a/spark/spark2-shims/src/main/java/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/java/org/apache/zeppelin/spark/Spark2Shims.java deleted file mode 100644 index 1b5cffb7ad..0000000000 --- a/spark/spark2-shims/src/main/java/org/apache/zeppelin/spark/Spark2Shims.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.zeppelin.spark; - -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkContext; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.expressions.GenericRow; -import org.apache.spark.sql.types.StructType; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.ResultMessages; -import org.apache.zeppelin.interpreter.SingleRowInterpreterResult; -import org.apache.zeppelin.tabledata.TableDataUtils; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -public class Spark2Shims extends SparkShims { - - private SparkSession sparkSession; - - public Spark2Shims(Properties properties, Object entryPoint) { - super(properties); - this.sparkSession = (SparkSession) entryPoint; - } - - public void setupSparkListener(final String master, - final String sparkWebUrl, - final InterpreterContext context) { - SparkContext sc = SparkContext.getOrCreate(); - sc.addSparkListener(new SparkListener() { - @Override - public void onJobStart(SparkListenerJobStart jobStart) { - - if (sc.getConf().getBoolean("spark.ui.enabled", true) && - !Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", "false"))) { - buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context); - } - } - }); - } - - @Override - public String showDataFrame(Object obj, int maxResult, InterpreterContext context) { - if (obj instanceof Dataset) { - Dataset<Row> df = ((Dataset) obj).toDF(); - String[] columns = df.columns(); - // DDL will empty DataFrame - if (columns.length == 0) { - return ""; - } - // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult - List<Row> rows = df.takeAsList(maxResult + 1); - String template = context.getLocalProperties().get("template"); - if (!StringUtils.isBlank(template)) { - if (rows.size() >= 1) { - return new SingleRowInterpreterResult(sparkRowToList(rows.get(0)), template, context).toHtml(); - } else { - return ""; - } - } - - StringBuilder msg = new StringBuilder(); - msg.append("\n%table "); - msg.append(StringUtils.join(TableDataUtils.normalizeColumns(columns), "\t")); - msg.append("\n"); - boolean isLargerThanMaxResult = rows.size() > maxResult; - if (isLargerThanMaxResult) { - rows = rows.subList(0, maxResult); - } - for (Row row : rows) { - for (int i = 0; i < row.size(); ++i) { - msg.append(TableDataUtils.normalizeColumn(row.get(i))); - if (i != row.size() -1) { - msg.append("\t"); - } - } - msg.append("\n"); - } - - if (isLargerThanMaxResult) { - msg.append("\n"); - msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult")); - } - // append %text at the end, otherwise the following output will be put in table as well. - msg.append("\n%text "); - return msg.toString(); - } else { - return obj.toString(); - } - } - - private List sparkRowToList(Row row) { - List list = new ArrayList(); - for (int i = 0; i< row.size(); i++) { - list.add(row.get(i)); - } - return list; - } - - @Override - public Dataset<Row> getAsDataFrame(String value) { - String[] lines = value.split("\\n"); - String head = lines[0]; - String[] columns = head.split("\t"); - StructType schema = new StructType(); - for (String column : columns) { - schema = schema.add(column, "String"); - } - - List<Row> rows = new ArrayList<>(); - for (int i = 1; i < lines.length; ++i) { - String[] tokens = lines[i].split("\t"); - Row row = new GenericRow(tokens); - rows.add(row); - } - return sparkSession.createDataFrame(rows, schema); - } -} diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml index 5c3c7ebbae..eae0784df2 100644 --- a/zeppelin-interpreter-integration/pom.xml +++ b/zeppelin-interpreter-integration/pom.xml @@ -176,87 +176,14 @@ <profiles> <profile> - <id>hadoop2</id> + <id>hadoop3</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> - <hadoop.version>${hadoop2.7.version}</hadoop.version> - <hadoop-client-api.artifact>hadoop-client</hadoop-client-api.artifact> - <hadoop-client-runtime.artifact>hadoop-yarn-api</hadoop-client-runtime.artifact> - <hadoop-client-minicluster.artifact>hadoop-client</hadoop-client-minicluster.artifact> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>test</scope> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-tests</artifactId> - <version>${hadoop.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - </dependencies> - - </profile> - <profile> - <id>hadoop3</id> - - <properties> - <hadoop.version>${hadoop3.2.version}</hadoop.version> + <hadoop.version>${hadoop3.3.version}</hadoop.version> <hadoop-client-runtime.artifact>hadoop-client-runtime</hadoop-client-runtime.artifact> <hadoop-client-minicluster.artifact>hadoop-client-minicluster</hadoop-client-minicluster.artifact> </properties> diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index 94c9188c8a..5cbd8f4766 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -172,11 +172,7 @@ public abstract class SparkIntegrationTest { // test SparkRInterpreter Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("spark.r", new ExecutionContext("user1", "note1", "test")); - if (isSpark2() || isSpark3()) { - interpreterResult = sparkrInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context); - } else { - interpreterResult = sparkrInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context); - } + interpreterResult = sparkrInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context); assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); assertEquals(interpreterResult.toString(), InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType()); assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("eruptions waiting")); @@ -370,10 +366,6 @@ public abstract class SparkIntegrationTest { } } - private boolean isSpark2() { - return this.sparkVersion.startsWith("2."); - } - private boolean isSpark3() { return this.sparkVersion.startsWith("3."); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java deleted file mode 100644 index df5eb2fe42..0000000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.codehaus.plexus.util.xml.pull.XmlPullParserException; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class SparkIntegrationTest24 extends SparkIntegrationTest { - - public SparkIntegrationTest24(String sparkVersion, String hadoopVersion) { - super(sparkVersion, hadoopVersion); - } - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"2.4.8", "2.7"} - }); - } - - @Override - public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException { - InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); - sparkInterpreterSetting.setProperty("spark.sql.execution.arrow.sparkr.enabled", "false"); - super.testYarnClusterMode(); - } -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java deleted file mode 100644 index 9dedf93873..0000000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class SparkIntegrationTest30 extends SparkIntegrationTest { - - public SparkIntegrationTest30(String sparkVersion, String hadoopVersion) { - super(sparkVersion, hadoopVersion); - } - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"3.0.3", "2.7"}, - {"3.0.3", "3.2"} - }); - } - - @Override - protected void setUpSparkInterpreterSetting(InterpreterSetting interpreterSetting) { - // spark3 doesn't support yarn-client and yarn-cluster any more, use - // spark.master and spark.submit.deployMode instead - String sparkMaster = interpreterSetting.getJavaProperties().getProperty("spark.master"); - if (sparkMaster.equals("yarn-client")) { - interpreterSetting.setProperty("spark.master", "yarn"); - interpreterSetting.setProperty("spark.submit.deployMode", "client"); - } else if (sparkMaster.equals("yarn-cluster")){ - interpreterSetting.setProperty("spark.master", "yarn"); - interpreterSetting.setProperty("spark.submit.deployMode", "cluster"); - } else if (sparkMaster.startsWith("local")) { - interpreterSetting.setProperty("spark.submit.deployMode", "client"); - } - } -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest31.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest31.java deleted file mode 100644 index bfaa1ea309..0000000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest31.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class SparkIntegrationTest31 extends SparkIntegrationTest { - - public SparkIntegrationTest31(String sparkVersion, String hadoopVersion) { - super(sparkVersion, hadoopVersion); - } - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"3.1.2", "2.7"}, - {"3.1.2", "3.2"} - }); - } - - @Override - protected void setUpSparkInterpreterSetting(InterpreterSetting interpreterSetting) { - // spark3 doesn't support yarn-client and yarn-cluster any more, use - // spark.master and spark.submit.deployMode instead - String sparkMaster = interpreterSetting.getJavaProperties().getProperty("spark.master"); - if (sparkMaster.equals("yarn-client")) { - interpreterSetting.setProperty("spark.master", "yarn"); - interpreterSetting.setProperty("spark.submit.deployMode", "client"); - } else if (sparkMaster.equals("yarn-cluster")){ - interpreterSetting.setProperty("spark.master", "yarn"); - interpreterSetting.setProperty("spark.submit.deployMode", "cluster"); - } else if (sparkMaster.startsWith("local")) { - interpreterSetting.setProperty("spark.submit.deployMode", "client"); - } - } -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java index 7575aa02c3..44ed30251a 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java @@ -62,8 +62,8 @@ public class SparkSubmitIntegrationTest { @BeforeClass public static void setUp() throws IOException { - String sparkVersion = "2.4.7"; - String hadoopVersion = "2.7"; + String sparkVersion = "3.4.1"; + String hadoopVersion = "3"; LOGGER.info("Testing Spark Version: " + sparkVersion); LOGGER.info("Testing Hadoop Version: " + hadoopVersion); sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion); @@ -101,8 +101,8 @@ public class SparkSubmitIntegrationTest { InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); InterpreterResult interpreterResult = - sparkSubmitInterpreter.interpret("--master yarn-cluster --class org.apache.spark.examples.SparkPi " + - sparkHome + "/examples/jars/spark-examples_2.11-2.4.7.jar", context); + sparkSubmitInterpreter.interpret("--master local --class org.apache.spark.examples.SparkPi --deploy-mode client " + + sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context); assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); // no yarn application launched @@ -123,10 +123,10 @@ public class SparkSubmitIntegrationTest { InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); String yarnAppName = "yarn_example"; InterpreterResult interpreterResult = - sparkSubmitInterpreter.interpret("--master yarn-cluster --class org.apache.spark.examples.SparkPi " + + sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " + "--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " + "--conf spark.executor.memory=512m " + - sparkHome + "/examples/jars/spark-examples_2.11-2.4.7.jar", context); + sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context); assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code()); GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.FINISHED)); @@ -156,10 +156,10 @@ public class SparkSubmitIntegrationTest { try { String yarnAppName = "yarn_cancel_example"; InterpreterResult interpreterResult = - sparkSubmitInterpreter.interpret("--master yarn-cluster --class org.apache.spark.examples.SparkPi " + + sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " + "--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " + "--conf spark.executor.memory=512m " + - sparkHome + "/examples/jars/spark-examples_2.11-2.4.7.jar", context); + sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context); assertEquals(interpreterResult.toString(), InterpreterResult.Code.INCOMPLETE, interpreterResult.code()); assertTrue(interpreterResult.toString(), interpreterResult.toString().contains("Paragraph received a SIGTERM")); } catch (InterpreterException e) { diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java index a30234bcf5..5a1a1aaaa3 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java @@ -67,8 +67,8 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000"); notebook = TestUtils.getInstance(Notebook.class); - sparkHome = DownloadUtils.downloadSpark("3.2.4", "3.2"); - flinkHome = DownloadUtils.downloadFlink("1.13.2", "2.11"); + sparkHome = DownloadUtils.downloadSpark("3.4.1", "3"); + flinkHome = DownloadUtils.downloadFlink("1.13.2", "2.12"); } @AfterClass @@ -188,7 +188,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { assertEquals(result.toString(), Status.FINISHED, result.getStatus()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("3.2.4")); + assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("3.4.1")); assertEquals(0, result.getJobUrls().size()); // pyspark @@ -226,7 +226,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { assertEquals(Status.ERROR, result.getStatus()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found: unknown_table")); + assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("The table or view `unknown_table` cannot be found")); assertEquals(0, result.getJobUrls().size()); } finally { @@ -257,7 +257,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { assertEquals(result.toString(), Status.FINISHED, result.getStatus()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("3.2.4")); + assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("3.4.1")); assertEquals(0, result.getJobUrls().size()); // pyspark @@ -299,7 +299,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { assertEquals(Status.ERROR, result.getStatus()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found: unknown_table")); + assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("The table or view `unknown_table` cannot be found")); assertEquals(0, result.getJobUrls().size()); // cancel diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index 9ba356be81..89177bc093 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -261,20 +261,11 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { IOUtils.copy(new StringReader("{\"metadata\": { \"key\": 84896, \"value\": 54 }}\n"), jsonFileWriter); jsonFileWriter.close(); - if (isSpark2() || isSpark3()) { - p.setText("%spark spark.read.json(\"file://" + tmpJsonFile.getAbsolutePath() + "\")"); - } else { - p.setText("%spark sqlContext.read.json(\"file://" + tmpJsonFile.getAbsolutePath() + "\")"); - } + p.setText("%spark spark.read.json(\"file://" + tmpJsonFile.getAbsolutePath() + "\")"); note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); - if (isSpark2() || isSpark3()) { - assertTrue(p.getReturn().message().get(0).getData().contains( - "org.apache.spark.sql.DataFrame = [metadata: struct<key: bigint, value: bigint>]")); - } else { - assertTrue(p.getReturn().message().get(0).getData().contains( - "org.apache.spark.sql.DataFrame = [metadata: struct<key:bigint,value:bigint>]")); - } + assertTrue(p.getReturn().message().get(0).getData().contains( + "org.apache.spark.sql.DataFrame = [metadata: struct<key: bigint, value: bigint>]")); return null; }); } finally { @@ -323,15 +314,9 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { note -> { // test basic dataframe api Paragraph p = note.addNewParagraph(anonymous); - if (isSpark2() || isSpark3()) { - p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" + + p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" + ".toDF(\"name\", \"age\")\n" + "df.collect()"); - } else { - p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))" + - ".toDF(\"name\", \"age\")\n" + - "df.collect()"); - } note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertTrue(p.getReturn().message().get(0).getData().contains( @@ -339,17 +324,10 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { // test display DataFrame p = note.addNewParagraph(anonymous); - if (isSpark2() || isSpark3()) { - p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" + + p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" + ".toDF(\"name\", \"age\")\n" + "df.createOrReplaceTempView(\"test_table\")\n" + "z.show(df)"); - } else { - p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))" + - ".toDF(\"name\", \"age\")\n" + - "df.registerTempTable(\"test_table\")\n" + - "z.show(df)"); - } note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType()); @@ -397,15 +375,13 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { p.getReturn().message().get(0).getData().contains("name age\n1 hello 20")); // test display DataSet - if (isSpark2() || isSpark3()) { - p = note.addNewParagraph(anonymous); - p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" + - "z.show(ds)"); - note.run(p.getId(), true); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType()); - assertEquals("_1\t_2\nhello\t20\n", p.getReturn().message().get(0).getData()); - } + p = note.addNewParagraph(anonymous); + p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" + + "z.show(ds)"); + note.run(p.getId(), true); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals(InterpreterResult.Type.TABLE, p.getReturn().message().get(0).getType()); + assertEquals("_1\t_2\nhello\t20\n", p.getReturn().message().get(0).getData()); return null; }); } finally { @@ -426,21 +402,10 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { note -> { Paragraph p = note.addNewParagraph(anonymous); - if (isSpark3()) { - p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + - "df <- createDataFrame(localDF)\n" + - "count(df)" - ); - } else { - String sqlContextName = "sqlContext"; - if (isSpark2() || isSpark3()) { - sqlContextName = "spark"; - } - p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + - "df <- createDataFrame(" + sqlContextName + ", localDF)\n" + - "count(df)" - ); - } + p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + + "df <- createDataFrame(localDF)\n" + + "count(df)" + ); note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); @@ -484,45 +449,25 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { List<InterpreterCompletion> completions = note.completion(p.getId(), code, code.length(), AuthenticationInfo.ANONYMOUS); assertTrue(completions.size() > 0); - if (isSpark2()){ - // run SparkSession test - p = note.addNewParagraph(anonymous); - p.setText("%pyspark from pyspark.sql import Row\n" + - "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + - "df.collect()"); - note.run(p.getId(), true); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("[Row(age=20, id=1)]\n", p.getReturn().message().get(0).getData()); - - // test udf - p = note.addNewParagraph(anonymous); - // use SQLContext to register UDF but use this UDF through SparkSession - p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + - "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); - note.run(p.getId(), true); - assertEquals(Status.FINISHED, p.getStatus()); - assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) || - "[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData())); - } else { - // run SparkSession test - p = note.addNewParagraph(anonymous); - p.setText("%pyspark from pyspark.sql import Row\n" + - "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + - "df.collect()"); - note.run(p.getId(), true); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("[Row(id=1, age=20)]\n", p.getReturn().message().get(0).getData()); - - // test udf - p = note.addNewParagraph(anonymous); - // use SQLContext to register UDF but use this UDF through SparkSession - p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + - "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); - note.run(p.getId(), true); - assertEquals(Status.FINISHED, p.getStatus()); - assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) || - "[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData())); - } + + // run SparkSession test + p = note.addNewParagraph(anonymous); + p.setText("%pyspark from pyspark.sql import Row\n" + + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + + "df.collect()"); + note.run(p.getId(), true); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(id=1, age=20)]\n", p.getReturn().message().get(0).getData()); + + // test udf + p = note.addNewParagraph(anonymous); + // use SQLContext to register UDF but use this UDF through SparkSession + p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + + "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); + note.run(p.getId(), true); + assertEquals(Status.FINISHED, p.getStatus()); + assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) || + "[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData())); return null; }); } finally { @@ -756,10 +701,6 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { } } - private boolean isSpark2() { - return sparkVersion.startsWith("2."); - } - private boolean isSpark3() { return sparkVersion.startsWith("3."); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java deleted file mode 100644 index d4ae3c7ee2..0000000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class ZeppelinSparkClusterTest24 extends ZeppelinSparkClusterTest { - - public ZeppelinSparkClusterTest24(String sparkVersion, String hadoopVersion) throws Exception { - super(sparkVersion, hadoopVersion); - } - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"2.4.7", "2.7"} - }); - } -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java deleted file mode 100644 index a1131d4724..0000000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class ZeppelinSparkClusterTest30 extends ZeppelinSparkClusterTest { - - public ZeppelinSparkClusterTest30(String sparkVersion, String hadoopVersion) throws Exception { - super(sparkVersion, hadoopVersion); - } - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"3.0.2", "2.7"}, - {"3.0.2", "3.2"} - }); - } -} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest31.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest31.java deleted file mode 100644 index 10ca4d9e6b..0000000000 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest31.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.integration; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.List; - -@RunWith(value = Parameterized.class) -public class ZeppelinSparkClusterTest31 extends ZeppelinSparkClusterTest { - - public ZeppelinSparkClusterTest31(String sparkVersion, String hadoopVersion) throws Exception { - super(sparkVersion, hadoopVersion); - } - - @Parameterized.Parameters - public static List<Object[]> data() { - return Arrays.asList(new Object[][]{ - {"3.1.1", "2.7"}, - {"3.1.1", "3.2"} - }); - } -} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index a075cf6777..15b07a245b 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -54,7 +54,7 @@ public class SparkInterpreterLauncherTest { System.clearProperty(confVar.getVarName()); } - sparkHome = DownloadUtils.downloadSpark("2.4.7", "2.7"); + sparkHome = DownloadUtils.downloadSpark("3.4.1", "3"); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath()); @@ -212,7 +212,8 @@ public class SparkInterpreterLauncherTest { assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); String sparkJars = "jar_1," + - zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + + zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + + ".jar," + zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; @@ -261,7 +262,8 @@ public class SparkInterpreterLauncherTest { assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); String sparkJars = "jar_1," + Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar").toString() + "," + - zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + + zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + + ".jar," + zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties"; @@ -311,7 +313,8 @@ public class SparkInterpreterLauncherTest { assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); String sparkJars = "jar_1," + - zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + + zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + + ".jar," + zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar"; String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr"; // escape special characters