This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 82e4057 [ZEPPELIN-4893]. Upgrade to spark 3.0.0 82e4057 is described below commit 82e4057d3744974b7b16b6f1f68c018355f3c765 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Sat Jun 20 17:58:52 2020 +0800 [ZEPPELIN-4893]. Upgrade to spark 3.0.0 ### What is this PR for? Simple PR to upgrade to official released spark 3.0.0. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4893 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3809 from zjffdu/ZEPPELIN-4893 and squashes the following commits: e34cd7984 [Jeff Zhang] save 0918aebfb [Jeff Zhang] [ZEPPELIN-4893]. Upgrade to spark 3.0.0 --- .travis.yml | 6 ++ docs/interpreter/spark.md | 32 ++++++++- .../src/main/resources/interpreter-setting.json | 7 ++ spark/pom.xml | 4 +- spark/spark3-shims/pom.xml | 2 +- testing/install_R.sh | 7 ++ .../zeppelin/integration/SparkIntegrationTest.java | 73 +++++++++++++-------- .../integration/SparkIntegrationTest16.java | 6 +- .../integration/SparkIntegrationTest20.java | 6 +- .../integration/SparkIntegrationTest21.java | 6 +- .../integration/SparkIntegrationTest22.java | 6 +- .../integration/SparkIntegrationTest23.java | 6 +- .../integration/SparkIntegrationTest24.java | 6 +- .../integration/SparkIntegrationTest30.java | 23 ++++++- .../integration/ZeppelinSparkClusterTest.java | 75 +++++++++++++++------- .../integration/ZeppelinSparkClusterTest16.java | 6 +- .../integration/ZeppelinSparkClusterTest20.java | 6 +- .../integration/ZeppelinSparkClusterTest21.java | 6 +- .../integration/ZeppelinSparkClusterTest22.java | 6 +- .../integration/ZeppelinSparkClusterTest23.java | 6 +- .../integration/ZeppelinSparkClusterTest24.java | 6 +- .../integration/ZeppelinSparkClusterTest30.java | 7 +- .../apache/zeppelin/rest/AbstractTestRestApi.java | 2 +- .../interpreter/integration/DownloadUtils.java | 11 +--- .../launcher/SparkInterpreterLauncherTest.java | 2 +- 25 files changed, 215 insertions(+), 108 deletions(-) diff --git a/.travis.yml b/.travis.yml index 05779db..c732915 100644 --- a/.travis.yml +++ b/.travis.yml @@ -97,6 +97,11 @@ jobs: # Run Spark integration test and unit test + # Run spark integration of in one zeppelin instance: Spark 3.0 + - jdk: "openjdk8" + dist: xenial + env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Phadoop2 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown -am" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest30,SparkIntegrationTest30 -DfailIfNoTests=false" + # Run spark integration of in one zeppelin instance (2.4, 2.3, 2.2) - jdk: "openjdk8" dist: xenial @@ -162,6 +167,7 @@ before_install: - clearcache=$(echo $gitlog | grep -c -E "clear bower|bower clear" || true) - if [ "$hasbowerchanged" -gt 0 ] || [ "$clearcache" -gt 0 ]; then echo "Clearing bower_components cache"; rm -r zeppelin-web/bower_components; npm cache verify; else echo "Using cached bower_components."; fi - echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxMetaspaceSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc + - if [[ -n $R ]]; then ./testing/install_R.sh; fi - bash -x ./testing/install_external_dependencies.sh - ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin || true - ls .node_modules && cp -r .node_modules zeppelin-web/node_modules || echo "node_modules are not cached" diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md index 990f466..5fc9305 100644 --- a/docs/interpreter/spark.md +++ b/docs/interpreter/spark.md @@ -85,6 +85,11 @@ You can also set other Spark properties which are not listed in the table. For a <td>local[*]</td> <td>Spark master uri. <br/> e.g. spark://master_host:7077</td> <tr> + <tr> + <td>spark.submit.deployMode</td> + <td></td> + <td>The deploy mode of Spark driver program, either "client" or "cluster", Which means to launch driver program locally ("client") or remotely ("cluster") on one of the nodes inside the cluster.</td> + <tr> <td>spark.app.name</td> <td>Zeppelin</td> <td>The name of spark application.</td> @@ -254,8 +259,8 @@ For example, * **local[*]** in local mode * **spark://master:7077** in standalone cluster - * **yarn-client** in Yarn client mode - * **yarn-cluster** in Yarn cluster mode + * **yarn-client** in Yarn client mode (Not supported in spark 3.x, refer below for how to configure yarn-client in Spark 3.x) + * **yarn-cluster** in Yarn cluster mode (Not supported in spark 3.x, refer below for how to configure yarn-client in Spark 3.x) * **mesos://host:5050** in Mesos cluster That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way. @@ -265,6 +270,29 @@ For the further information about Spark & Zeppelin version compatibility, please > Yarn client mode and local mode will run driver in the same machine with > zeppelin server, this would be dangerous for production. Because it may run > out of memory when there's many spark interpreters running at the same time. > So we suggest you only allow yarn-cluster mode via setting > `zeppelin.spark.only_yarn_cluster` in `zeppelin-site.xml`. +#### Configure yarn mode for Spark 3.x + +Specifying `yarn-client` & `yarn-cluster` in `spark.master` is not supported in Spark 3.x any more, instead you need to use `spark.master` and `spark.submit.deployMode` together. + +<table class="table-configuration"> + <tr> + <th>Mode</th> + <th>spark.master</th> + <th>spark.submit.deployMode</th> + </tr> + <tr> + <td>Yarn Client</td> + <td>yarn</td> + <td>client</td> + </tr> + <tr> + <td>Yarn Cluster</td> + <td>yarn</td> + <td>cluster</td> + </tr> +</table> + + ## SparkContext, SQLContext, SparkSession, ZeppelinContext SparkContext, SQLContext, SparkSession (for spark 2.x) and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext`, `spark` and `z`, respectively, in Scala, Kotlin, Python and R environments. diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json b/spark/interpreter/src/main/resources/interpreter-setting.json index dfe09d6..db078c5 100644 --- a/spark/interpreter/src/main/resources/interpreter-setting.json +++ b/spark/interpreter/src/main/resources/interpreter-setting.json @@ -19,6 +19,13 @@ "description": "Spark master uri. local | yarn-client | yarn-cluster | spark master address of standalone mode, ex) spark://master_host:7077", "type": "string" }, + "spark.submit.deployMode": { + "envName": "", + "propertyName": "spark.submit.deployMode", + "defaultValue": "", + "description": "The deploy mode of Spark driver program, either \"client\" or \"cluster\", Which means to launch driver program locally (\"client\") or remotely (\"cluster\") on one of the nodes inside the cluster.", + "type": "string" + }, "spark.app.name": { "envName": "", "propertyName": "spark.app.name", diff --git a/spark/pom.xml b/spark/pom.xml index 3102eb8..25301b2 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -195,9 +195,9 @@ <profile> <id>spark-3.0</id> <properties> - <spark.version>3.0.0-preview2</spark.version> + <spark.version>3.0.0</spark.version> <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.8.1</py4j.version> + <py4j.version>0.10.9</py4j.version> </properties> </profile> diff --git a/spark/spark3-shims/pom.xml b/spark/spark3-shims/pom.xml index 645a83e..853bf71 100644 --- a/spark/spark3-shims/pom.xml +++ b/spark/spark3-shims/pom.xml @@ -34,7 +34,7 @@ <properties> <scala.binary.version>2.12</scala.binary.version> - <spark.version>3.0.0-preview2</spark.version> + <spark.version>3.0.0</spark.version> </properties> <dependencies> diff --git a/testing/install_R.sh b/testing/install_R.sh new file mode 100755 index 0000000..d6bcb86 --- /dev/null +++ b/testing/install_R.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +# Install instruction from here https://cran.r-project.org/bin/linux/ubuntu/README.html + +sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9 +echo "deb https://cloud.r-project.org/bin/linux/ubuntu xenial-cran35/" | sudo tee -a /etc/apt/sources.list +sudo apt-get update +sudo apt-get install -y r-base diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index dd86529..42f35a6 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -44,7 +44,6 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.EnumSet; -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -61,10 +60,11 @@ public abstract class SparkIntegrationTest { private String sparkVersion; private String sparkHome; - public SparkIntegrationTest(String sparkVersion) { - LOGGER.info("Testing SparkVersion: " + sparkVersion); + public SparkIntegrationTest(String sparkVersion, String hadoopVersion) { + LOGGER.info("Testing Spark Version: " + sparkVersion); + LOGGER.info("Testing Hadoop Version: " + hadoopVersion); this.sparkVersion = sparkVersion; - this.sparkHome = DownloadUtils.downloadSpark(sparkVersion); + this.sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion); } @BeforeClass @@ -88,6 +88,10 @@ public abstract class SparkIntegrationTest { } } + protected void setUpSparkInterpreterSetting(InterpreterSetting interpreterSetting) { + // sub class can customize spark interpreter setting. + } + private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException { // add jars & packages for testing InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); @@ -133,7 +137,7 @@ public abstract class SparkIntegrationTest { // test SparkRInterpreter Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("spark.r", new ExecutionContext("user1", "note1", "test")); - if (isSpark2()) { + if (isSpark2() || isSpark3()) { interpreterResult = sparkrInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context); } else { interpreterResult = sparkrInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context); @@ -154,14 +158,17 @@ public abstract class SparkIntegrationTest { sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false"); sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); - testInterpreterBasics(); + try { + setUpSparkInterpreterSetting(sparkInterpreterSetting); + testInterpreterBasics(); - // no yarn application launched - GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); - GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); - assertEquals(0, response.getApplicationList().size()); - - interpreterSettingManager.close(); + // no yarn application launched + GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); + GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); + assertEquals(0, response.getApplicationList().size()); + } finally { + interpreterSettingManager.close(); + } } @Test @@ -178,16 +185,19 @@ public abstract class SparkIntegrationTest { sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false"); sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); - testInterpreterBasics(); + try { + setUpSparkInterpreterSetting(sparkInterpreterSetting); + testInterpreterBasics(); - // 1 yarn application launched - GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); - GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); - assertEquals(1, response.getApplicationList().size()); - - interpreterSettingManager.close(); + // 1 yarn application launched + GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); + GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); + assertEquals(1, response.getApplicationList().size()); - waitForYarnAppCompleted(30 * 1000); + } finally { + interpreterSettingManager.close(); + waitForYarnAppCompleted(30 * 1000); + } } private void waitForYarnAppCompleted(int timeout) throws YarnException { @@ -223,22 +233,29 @@ public abstract class SparkIntegrationTest { sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false"); sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); - testInterpreterBasics(); + try { + setUpSparkInterpreterSetting(sparkInterpreterSetting); + testInterpreterBasics(); - // 1 yarn application launched - GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); - GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); - assertEquals(1, response.getApplicationList().size()); - - interpreterSettingManager.close(); + // 1 yarn application launched + GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING)); + GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request); + assertEquals(1, response.getApplicationList().size()); - waitForYarnAppCompleted(30 * 1000); + } finally { + interpreterSettingManager.close(); + waitForYarnAppCompleted(30 * 1000); + } } private boolean isSpark2() { return this.sparkVersion.startsWith("2."); } + private boolean isSpark3() { + return this.sparkVersion.startsWith("3."); + } + private String getPythonExec() throws IOException, InterruptedException { Process process = Runtime.getRuntime().exec(new String[]{"which", "python"}); if (process.waitFor() != 0) { diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java index 8f5aacb..6574ed6 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java @@ -26,14 +26,14 @@ import java.util.List; @RunWith(value = Parameterized.class) public class SparkIntegrationTest16 extends SparkIntegrationTest{ - public SparkIntegrationTest16(String sparkVersion) { - super(sparkVersion); + public SparkIntegrationTest16(String sparkVersion, String hadoopVersion) { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"1.6.3"} + {"1.6.3", "2.6"} }); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java index 4f3ebd8..b9c7cb0 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java @@ -26,14 +26,14 @@ import java.util.List; @RunWith(value = Parameterized.class) public class SparkIntegrationTest20 extends SparkIntegrationTest{ - public SparkIntegrationTest20(String sparkVersion) { - super(sparkVersion); + public SparkIntegrationTest20(String sparkVersion, String hadoopVersion) { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.0.2"} + {"2.0.2", "2.7"} }); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java index 37305cd..5f0fdfc 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java @@ -26,14 +26,14 @@ import java.util.List; @RunWith(value = Parameterized.class) public class SparkIntegrationTest21 extends SparkIntegrationTest{ - public SparkIntegrationTest21(String sparkVersion) { - super(sparkVersion); + public SparkIntegrationTest21(String sparkVersion, String hadoopVersion) { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.1.3"} + {"2.1.3", "2.7"} }); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java index a400118..cddd2a7 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java @@ -26,14 +26,14 @@ import java.util.List; @RunWith(value = Parameterized.class) public class SparkIntegrationTest22 extends SparkIntegrationTest{ - public SparkIntegrationTest22(String sparkVersion) { - super(sparkVersion); + public SparkIntegrationTest22(String sparkVersion, String hadoopVersion) { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.2.2"} + {"2.2.2", "2.7"} }); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java index ca960d3..834e3d8 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java @@ -26,14 +26,14 @@ import java.util.List; @RunWith(value = Parameterized.class) public class SparkIntegrationTest23 extends SparkIntegrationTest{ - public SparkIntegrationTest23(String sparkVersion) { - super(sparkVersion); + public SparkIntegrationTest23(String sparkVersion, String hadoopVersion) { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.3.2"} + {"2.3.2", "2.7"} }); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java index aae4951..6920c20 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java @@ -26,14 +26,14 @@ import java.util.List; @RunWith(value = Parameterized.class) public class SparkIntegrationTest24 extends SparkIntegrationTest{ - public SparkIntegrationTest24(String sparkVersion) { - super(sparkVersion); + public SparkIntegrationTest24(String sparkVersion, String hadoopVersion) { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.4.4"} + {"2.4.4", "2.7"} }); } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java index 4371023..8e3c8c8 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.integration; +import org.apache.zeppelin.interpreter.InterpreterSetting; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -26,15 +27,31 @@ import java.util.List; @RunWith(value = Parameterized.class) public class SparkIntegrationTest30 extends SparkIntegrationTest { - public SparkIntegrationTest30(String sparkVersion) { - super(sparkVersion); + public SparkIntegrationTest30(String sparkVersion, String hadoopVersion) { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"3.0.0-preview2"} + {"3.0.0", "2.7"}, + {"3.0.0", "3.2"} }); } + @Override + protected void setUpSparkInterpreterSetting(InterpreterSetting interpreterSetting) { + // spark3 doesn't support yarn-client and yarn-cluster any more, use + // spark.master and spark.submit.deployMode instead + String sparkMaster = interpreterSetting.getJavaProperties().getProperty("spark.master"); + if (sparkMaster.equals("yarn-client")) { + interpreterSetting.setProperty("spark.master", "yarn"); + interpreterSetting.setProperty("spark.submit.deployMode", "client"); + } else if (sparkMaster.equals("yarn-cluster")){ + interpreterSetting.setProperty("spark.master", "yarn"); + interpreterSetting.setProperty("spark.submit.deployMode", "cluster"); + } else if (sparkMaster.startsWith("local")) { + interpreterSetting.setProperty("spark.submit.deployMode", "client"); + } + } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index 599727f..2eb505a 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -76,10 +76,10 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { private String sparkHome; private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous"); - public ZeppelinSparkClusterTest(String sparkVersion) throws Exception { + public ZeppelinSparkClusterTest(String sparkVersion, String hadoopVersion) throws Exception { this.sparkVersion = sparkVersion; LOGGER.info("Testing SparkVersion: " + sparkVersion); - this.sparkHome = DownloadUtils.downloadSpark(sparkVersion); + this.sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion); if (!verifiedSparkVersions.contains(sparkVersion)) { verifiedSparkVersions.add(sparkVersion); setupSparkInterpreter(sparkHome); @@ -224,14 +224,14 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { IOUtils.copy(new StringReader("{\"metadata\": { \"key\": 84896, \"value\": 54 }}\n"), jsonFileWriter); jsonFileWriter.close(); - if (isSpark2()) { + if (isSpark2() || isSpark3()) { p.setText("%spark spark.read.json(\"file://" + tmpJsonFile.getAbsolutePath() + "\")"); } else { p.setText("%spark sqlContext.read.json(\"file://" + tmpJsonFile.getAbsolutePath() + "\")"); } note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); - if (isSpark2()) { + if (isSpark2() || isSpark3()) { assertTrue(p.getReturn().message().get(0).getData().contains( "org.apache.spark.sql.DataFrame = [metadata: struct<key: bigint, value: bigint>]")); } else { @@ -247,7 +247,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { @Test public void sparkReadCSVTest() throws IOException { - if (!isSpark2()) { + if (isSpark1()) { // csv if not supported in spark 1.x natively return; } @@ -279,7 +279,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); // test basic dataframe api Paragraph p = note.addNewParagraph(anonymous); - if (isSpark2()) { + if (isSpark2() || isSpark3()) { p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" + ".toDF(\"name\", \"age\")\n" + "df.collect()"); @@ -295,7 +295,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { // test display DataFrame p = note.addNewParagraph(anonymous); - if (isSpark2()) { + if (isSpark2() || isSpark3()) { p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" + ".toDF(\"name\", \"age\")\n" + "df.createOrReplaceTempView(\"test_table\")\n" + @@ -353,7 +353,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { p.getReturn().message().get(0).getData().contains("name age\n1 hello 20")); // test display DataSet - if (isSpark2()) { + if (isSpark2() || isSpark3()) { p = note.addNewParagraph(anonymous); p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" + "z.show(ds)"); @@ -374,16 +374,24 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { Note note = null; try { note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous); + Paragraph p = note.addNewParagraph(anonymous); - String sqlContextName = "sqlContext"; - if (isSpark2()) { - sqlContextName = "spark"; + if (isSpark3()) { + p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + + "df <- createDataFrame(localDF)\n" + + "count(df)" + ); + } else { + String sqlContextName = "sqlContext"; + if (isSpark2() || isSpark3()) { + sqlContextName = "spark"; + } + p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + + "df <- createDataFrame(" + sqlContextName + ", localDF)\n" + + "count(df)" + ); } - Paragraph p = note.addNewParagraph(anonymous); - p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + - "df <- createDataFrame(" + sqlContextName + ", localDF)\n" + - "count(df)" - ); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("[1] 3", p.getReturn().message().get(0).getData().trim()); @@ -415,7 +423,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(Status.FINISHED, p.getStatus()); assertEquals("name_abc\n", p.getReturn().message().get(0).getData()); - if (!isSpark2()) { + if (isSpark1()) { // run sqlContext test p = note.addNewParagraph(anonymous); p.setText("%pyspark from pyspark.sql import Row\n" + @@ -461,7 +469,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { .contains("Fail to execute line 3: print(a2)")); assertTrue(p.getReturn().message().get(0).getData() .contains("name 'a2' is not defined")); - } else { + } else if (isSpark2()){ // run SparkSession test p = note.addNewParagraph(anonymous); p.setText("%pyspark from pyspark.sql import Row\n" + @@ -480,6 +488,25 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(Status.FINISHED, p.getStatus()); assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) || "[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData())); + } else { + // run SparkSession test + p = note.addNewParagraph(anonymous); + p.setText("%pyspark from pyspark.sql import Row\n" + + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + + "df.collect()"); + note.run(p.getId(), true); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(id=1, age=20)]\n", p.getReturn().message().get(0).getData()); + + // test udf + p = note.addNewParagraph(anonymous); + // use SQLContext to register UDF but use this UDF through SparkSession + p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + + "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); + note.run(p.getId(), true); + assertEquals(Status.FINISHED, p.getStatus()); + assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) || + "[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData())); } } finally { if (null != note) { @@ -680,14 +707,16 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi { } } - private int toIntSparkVersion(String sparkVersion) { - String[] split = sparkVersion.split("\\."); - int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]); - return version; + private boolean isSpark1() { + return sparkVersion.startsWith("1."); } private boolean isSpark2() { - return toIntSparkVersion(sparkVersion) >= 20; + return sparkVersion.startsWith("2."); + } + + private boolean isSpark3() { + return sparkVersion.startsWith("3."); } @Test diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java index 954f024..777c166 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java @@ -26,15 +26,15 @@ import java.util.List; @RunWith(value = Parameterized.class) public class ZeppelinSparkClusterTest16 extends ZeppelinSparkClusterTest { - public ZeppelinSparkClusterTest16(String sparkVersion) throws Exception { - super(sparkVersion); + public ZeppelinSparkClusterTest16(String sparkVersion, String hadoopVersion) throws Exception { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"1.6.3"} + {"1.6.3", "2.6"} }); } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java index 22687d9..be2b5c6 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java @@ -26,15 +26,15 @@ import java.util.List; @RunWith(value = Parameterized.class) public class ZeppelinSparkClusterTest20 extends ZeppelinSparkClusterTest { - public ZeppelinSparkClusterTest20(String sparkVersion) throws Exception { - super(sparkVersion); + public ZeppelinSparkClusterTest20(String sparkVersion, String hadoopVersion) throws Exception { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.0.2"} + {"2.0.2", "2.7"} }); } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java index fd98364..c127312 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java @@ -26,15 +26,15 @@ import java.util.List; @RunWith(value = Parameterized.class) public class ZeppelinSparkClusterTest21 extends ZeppelinSparkClusterTest { - public ZeppelinSparkClusterTest21(String sparkVersion) throws Exception { - super(sparkVersion); + public ZeppelinSparkClusterTest21(String sparkVersion, String hadoopVersion) throws Exception { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.1.3"} + {"2.1.3", "2.7"} }); } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java index 9b51e17..d7a63af 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java @@ -26,15 +26,15 @@ import java.util.List; @RunWith(value = Parameterized.class) public class ZeppelinSparkClusterTest22 extends ZeppelinSparkClusterTest { - public ZeppelinSparkClusterTest22(String sparkVersion) throws Exception { - super(sparkVersion); + public ZeppelinSparkClusterTest22(String sparkVersion, String hadoopVersion) throws Exception { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.2.2"} + {"2.2.2", "2.7"} }); } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java index 22ef673..7b15af7 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java @@ -26,15 +26,15 @@ import java.util.List; @RunWith(value = Parameterized.class) public class ZeppelinSparkClusterTest23 extends ZeppelinSparkClusterTest { - public ZeppelinSparkClusterTest23(String sparkVersion) throws Exception { - super(sparkVersion); + public ZeppelinSparkClusterTest23(String sparkVersion, String hadoopVersion) throws Exception { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.3.2"} + {"2.3.2", "2.7"} }); } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java index a55a504..e1f05ff 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java @@ -26,14 +26,14 @@ import java.util.List; @RunWith(value = Parameterized.class) public class ZeppelinSparkClusterTest24 extends ZeppelinSparkClusterTest { - public ZeppelinSparkClusterTest24(String sparkVersion) throws Exception { - super(sparkVersion); + public ZeppelinSparkClusterTest24(String sparkVersion, String hadoopVersion) throws Exception { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"2.4.4"} + {"2.4.4", "2.7"} }); } } diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java index 0a09ad5..8405900 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java @@ -26,14 +26,15 @@ import java.util.List; @RunWith(value = Parameterized.class) public class ZeppelinSparkClusterTest30 extends ZeppelinSparkClusterTest { - public ZeppelinSparkClusterTest30(String sparkVersion) throws Exception { - super(sparkVersion); + public ZeppelinSparkClusterTest30(String sparkVersion, String hadoopVersion) throws Exception { + super(sparkVersion, hadoopVersion); } @Parameterized.Parameters public static List<Object[]> data() { return Arrays.asList(new Object[][]{ - {"3.0.0-preview2"} + {"3.0.0", "2.7"}, + {"3.0.0", "3.2"} }); } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 5658c01..2d4d861 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -261,7 +261,7 @@ public abstract class AbstractTestRestApi { LOG.info("Zeppelin Server is started."); // set up spark interpreter - String sparkHome = DownloadUtils.downloadSpark("2.4.4"); + String sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7"); InterpreterSettingManager interpreterSettingManager = TestUtils.getInstance(InterpreterSettingManager.class); InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); interpreterSetting.setProperty("SPARK_HOME", sparkHome); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java index db75c24..4e6211c 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java @@ -47,20 +47,15 @@ public class DownloadUtils { } } - public static String downloadSpark(String version) { - String hadoopVersion = "2.7"; - if (version.startsWith("1.")) { - // Use hadoop 2.6 for spark 1.x - hadoopVersion = "2.6"; - } + public static String downloadSpark(String sparkVersion, String hadoopVersion) { String sparkDownloadFolder = downloadFolder + "/spark"; File targetSparkHomeFolder = - new File(sparkDownloadFolder + "/spark-" + version + "-bin-hadoop" + hadoopVersion); + new File(sparkDownloadFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion); if (targetSparkHomeFolder.exists()) { LOGGER.info("Skip to download spark as it is already downloaded."); return targetSparkHomeFolder.getAbsolutePath(); } - download("spark", version, "-bin-hadoop" + hadoopVersion + ".tgz"); + download("spark", sparkVersion, "-bin-hadoop" + hadoopVersion + ".tgz"); return targetSparkHomeFolder.getAbsolutePath(); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index ab2ebae..736c11a 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -52,7 +52,7 @@ public class SparkInterpreterLauncherTest { System.clearProperty(confVar.getVarName()); } - sparkHome = DownloadUtils.downloadSpark("2.3.2"); + sparkHome = DownloadUtils.downloadSpark("2.3.2", "2.7"); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath());