This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new ca216a512e [ZEPPELIN-5755] Support Spark 3.3 (#4388) ca216a512e is described below commit ca216a512e5c6ad474ddece5ecc17cfe594de6bc Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue Jul 12 08:49:35 2022 +0800 [ZEPPELIN-5755] Support Spark 3.3 (#4388) * [ZEPPELIN-5755] Support Spark 3.3 * use hadoop3 profile for spark 3.3 --- .github/workflows/core.yml | 10 +++- pom.xml | 2 +- spark/interpreter/pom.xml | 14 +++++- .../apache/zeppelin/spark/IPySparkInterpreter.java | 5 ++ .../apache/zeppelin/spark/PySparkInterpreter.java | 6 +++ .../src/main/resources/python/zeppelin_ipyspark.py | 6 ++- .../src/main/resources/python/zeppelin_pyspark.py | 8 ++-- .../zeppelin/spark/SparkSqlInterpreterTest.java | 8 ++-- spark/scala-2.13/pom.xml | 2 +- .../org/apache/zeppelin/spark/SparkVersion.java | 6 ++- testing/env_python_3.7_with_R.yml | 2 +- testing/env_python_3.8_with_R.yml | 2 +- testing/env_python_3_with_R.yml | 2 +- testing/env_python_3_with_R_and_tensorflow.yml | 2 +- .../integration/SparkIntegrationTest33.java | 56 ++++++++++++++++++++++ .../integration/ZeppelinSparkClusterTest33.java | 40 ++++++++++++++++ 16 files changed, 155 insertions(+), 16 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index b9b3953fc8..aceaf9fa11 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -325,7 +325,7 @@ jobs: run: | R -e "IRkernel::installspec()" - name: run tests on hadoop${{ matrix.hadoop }} - run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration -Phadoop${{ matrix.hadoop }} -Pintegration -B -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32 -DfailIfNoTests=false + run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration -Phadoop${{ matrix.hadoop }} -Pintegration -B -Dtest=SparkSubmitIntegrationTest,ZeppelinSparkClusterTest24,SparkIntegrationTest24,ZeppelinSparkClusterTest30,SparkIntegrationTest30,ZeppelinSparkClusterTest31,SparkIntegrationTest31,ZeppelinSparkClusterTest32,SparkIntegrationTest32,ZeppelinSparkClusterTest33,SparkIntegrationTest33 -DfailIfNoTests=false # test on spark for each spark version & scala version spark-test: @@ -395,6 +395,14 @@ jobs: run: | rm -rf spark/interpreter/metastore_db ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.2 -Pspark-scala-2.13 -Phadoop2 -Pintegration -B -DfailIfNoTests=false + - name: run spark-3.3 tests with scala-2.12 and python-${{ matrix.python }} + run: | + rm -rf spark/interpreter/metastore_db + ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.3 -Pspark-scala-2.12 -Phadoop3 -Pintegration -B -DfailIfNoTests=false + - name: run spark-3.3 tests with scala-2.13 and python-${{ matrix.python }} + run: | + rm -rf spark/interpreter/metastore_db + ./mvnw test -DskipRat -pl spark-submit,spark/interpreter -Pspark-3.3 -Pspark-scala-2.13 -Phadoop3 -Pintegration -B -DfailIfNoTests=false livy-0-5-with-spark-2-2-0-under-python3: runs-on: ubuntu-20.04 diff --git a/pom.xml b/pom.xml index 668bb99d70..78b011339c 100644 --- a/pom.xml +++ b/pom.xml @@ -131,7 +131,7 @@ <httpcomponents.client.version>4.5.13</httpcomponents.client.version> <httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version> <commons.compress.version>1.21</commons.compress.version> - <commons.lang3.version>3.10</commons.lang3.version> + <commons.lang3.version>3.12.0</commons.lang3.version> <commons.text.version>1.8</commons.text.version> <commons.configuration2.version>2.7</commons.configuration2.version> <commons.exec.version>1.3</commons.exec.version> diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index e539f99732..8f154ba609 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -572,10 +572,22 @@ <!-- profile spark-x only affect spark version used in test --> <profile> - <id>spark-3.2</id> + <id>spark-3.3</id> <activation> <activeByDefault>true</activeByDefault> </activation> + <properties> + <datanucleus.core.version>4.1.17</datanucleus.core.version> + <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> + <datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version> + <spark.version>3.3.0</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <py4j.version>0.10.9.5</py4j.version> + </properties> + </profile> + + <profile> + <id>spark-3.2</id> <properties> <datanucleus.core.version>4.1.17</datanucleus.core.version> <datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version> diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index ab6b3dbf29..2e945ed2bb 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -161,6 +161,11 @@ public class IPySparkInterpreter extends IPythonInterpreter { return sparkInterpreter.getSparkVersion().getMajorVersion() == 3; } + // Used by PySpark + public boolean isAfterSpark33() { + return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0); + } + public JavaSparkContext getJavaSparkContext() { return sparkInterpreter.getJavaSparkContext(); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 514ffa99fa..737bef8f4b 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -233,7 +233,13 @@ public class PySparkInterpreter extends PythonInterpreter { } } + // Used by PySpark public boolean isSpark3() { return sparkInterpreter.getSparkVersion().getMajorVersion() == 3; } + + // Used by PySpark + public boolean isAfterSpark33() { + return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_3_3_0); + } } diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py index fdf7b97918..958802ccdb 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py @@ -57,8 +57,12 @@ conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf) sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) from pyspark.sql import SparkSession +from pyspark.sql import SQLContext spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) -sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped +if intp.isAfterSpark33(): + sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc) +else: + sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped class IPySparkZeppelinContext(PyZeppelinContext): diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py index 8dd3224baf..a77c383886 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py @@ -49,10 +49,12 @@ conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) from pyspark.sql import SparkSession +from pyspark.sql import SQLContext spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) -sqlc = __zSqlc__ = __zSpark__._wrapped - -sqlContext = __zSqlc__ +if intp.isAfterSpark33(): + sqlContext = sqlc = __zSqlc__ = SQLContext._get_or_create(sc) +else: + sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped from zeppelin_context import PyZeppelinContext diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index f469fc5ff2..c4a1d8e7d0 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -225,15 +225,17 @@ public class SparkSqlInterpreterTest { assertEquals(InterpreterResult.Code.ERROR, ret.code()); assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size()); assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType()); - assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input")); + assertTrue(context.out.toString(), context.out.toString().contains("mismatched input") || + context.out.toString().contains("Syntax error")); // One correct sql + One invalid sql + One valid sql (skipped) ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select count(1) from gr", context); assertEquals(InterpreterResult.Code.ERROR, ret.code()); assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size()); assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType()); - assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input")); - + assertTrue(context.out.toString(), context.out.toString().contains("mismatched input") || + context.out.toString().contains("Syntax error")); + // Two 2 comments ret = sqlInterpreter.interpret( "--comment_1\n--comment_2", context); diff --git a/spark/scala-2.13/pom.xml b/spark/scala-2.13/pom.xml index c8f3bbc1f7..d2f337ed1b 100644 --- a/spark/scala-2.13/pom.xml +++ b/spark/scala-2.13/pom.xml @@ -31,7 +31,7 @@ <name>Zeppelin: Spark Interpreter Scala_2.13</name> <properties> - <spark.version>3.2.0</spark.version> + <spark.version>3.3.0</spark.version> <spark.scala.version>2.13.4</spark.scala.version> <spark.scala.binary.version>2.13</spark.scala.binary.version> <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version> diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java index eb41f43b12..8f88c1b6e1 100644 --- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -30,11 +30,15 @@ public class SparkVersion { public static final SparkVersion SPARK_2_3_0 = SparkVersion.fromVersionString("2.3.0"); public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1"); public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0"); + public static final SparkVersion SPARK_3_1_0 = SparkVersion.fromVersionString("3.1.0"); + public static final SparkVersion SPARK_3_3_0 = SparkVersion.fromVersionString("3.3.0"); + public static final SparkVersion SPARK_3_4_0 = SparkVersion.fromVersionString("3.4.0"); + public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_2_0_0; - public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_3_3_0; + public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_3_4_0; private int version; private int majorVersion; diff --git a/testing/env_python_3.7_with_R.yml b/testing/env_python_3.7_with_R.yml index 10d46aa9d1..34e228e9d0 100644 --- a/testing/env_python_3.7_with_R.yml +++ b/testing/env_python_3.7_with_R.yml @@ -24,7 +24,7 @@ dependencies: - plotly - jinja2=3.0.3 - pip - - r-base=3 + - r-base=3.6 - r-data.table - r-evaluate - r-base64enc diff --git a/testing/env_python_3.8_with_R.yml b/testing/env_python_3.8_with_R.yml index 10d46aa9d1..34e228e9d0 100644 --- a/testing/env_python_3.8_with_R.yml +++ b/testing/env_python_3.8_with_R.yml @@ -24,7 +24,7 @@ dependencies: - plotly - jinja2=3.0.3 - pip - - r-base=3 + - r-base=3.6 - r-data.table - r-evaluate - r-base64enc diff --git a/testing/env_python_3_with_R.yml b/testing/env_python_3_with_R.yml index 016e7082b2..bd6a5781c0 100644 --- a/testing/env_python_3_with_R.yml +++ b/testing/env_python_3_with_R.yml @@ -25,7 +25,7 @@ dependencies: - plotly - jinja2=3.0.3 - pip - - r-base=3 + - r-base=3.6 - r-data.table - r-evaluate - r-base64enc diff --git a/testing/env_python_3_with_R_and_tensorflow.yml b/testing/env_python_3_with_R_and_tensorflow.yml index 498a00dd01..bbe0d80db3 100644 --- a/testing/env_python_3_with_R_and_tensorflow.yml +++ b/testing/env_python_3_with_R_and_tensorflow.yml @@ -25,7 +25,7 @@ dependencies: - plotly - jinja2=3.0.3 - pip - - r-base=3 + - r-base=3.6 - r-data.table - r-evaluate - r-base64enc diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java new file mode 100644 index 0000000000..44fa8ebb99 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest33.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class SparkIntegrationTest33 extends SparkIntegrationTest { + + public SparkIntegrationTest33(String sparkVersion, String hadoopVersion) { + super(sparkVersion, hadoopVersion); + } + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"3.3.0", "2"}, + }); + } + + @Override + protected void setUpSparkInterpreterSetting(InterpreterSetting interpreterSetting) { + // spark3 doesn't support yarn-client and yarn-cluster anymore, use + // spark.master and spark.submit.deployMode instead + String sparkMaster = interpreterSetting.getJavaProperties().getProperty("spark.master"); + if (sparkMaster.equals("yarn-client")) { + interpreterSetting.setProperty("spark.master", "yarn"); + interpreterSetting.setProperty("spark.submit.deployMode", "client"); + } else if (sparkMaster.equals("yarn-cluster")){ + interpreterSetting.setProperty("spark.master", "yarn"); + interpreterSetting.setProperty("spark.submit.deployMode", "cluster"); + } else if (sparkMaster.startsWith("local")) { + interpreterSetting.setProperty("spark.submit.deployMode", "client"); + } + } +} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java new file mode 100644 index 0000000000..11f62217e0 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest33.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class ZeppelinSparkClusterTest33 extends ZeppelinSparkClusterTest { + + public ZeppelinSparkClusterTest33(String sparkVersion, String hadoopVersion) throws Exception { + super(sparkVersion, hadoopVersion); + } + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"3.3.0", "2"}, + {"3.3.0", "3"} + }); + } +}