This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 82e4057  [ZEPPELIN-4893]. Upgrade to spark 3.0.0
82e4057 is described below

commit 82e4057d3744974b7b16b6f1f68c018355f3c765
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Sat Jun 20 17:58:52 2020 +0800

    [ZEPPELIN-4893]. Upgrade to spark 3.0.0
    
    ### What is this PR for?
    
    Simple PR to upgrade to official released spark 3.0.0.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4893
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3809 from zjffdu/ZEPPELIN-4893 and squashes the following commits:
    
    e34cd7984 [Jeff Zhang] save
    0918aebfb [Jeff Zhang] [ZEPPELIN-4893]. Upgrade to spark 3.0.0
---
 .travis.yml                                        |  6 ++
 docs/interpreter/spark.md                          | 32 ++++++++-
 .../src/main/resources/interpreter-setting.json    |  7 ++
 spark/pom.xml                                      |  4 +-
 spark/spark3-shims/pom.xml                         |  2 +-
 testing/install_R.sh                               |  7 ++
 .../zeppelin/integration/SparkIntegrationTest.java | 73 +++++++++++++--------
 .../integration/SparkIntegrationTest16.java        |  6 +-
 .../integration/SparkIntegrationTest20.java        |  6 +-
 .../integration/SparkIntegrationTest21.java        |  6 +-
 .../integration/SparkIntegrationTest22.java        |  6 +-
 .../integration/SparkIntegrationTest23.java        |  6 +-
 .../integration/SparkIntegrationTest24.java        |  6 +-
 .../integration/SparkIntegrationTest30.java        | 23 ++++++-
 .../integration/ZeppelinSparkClusterTest.java      | 75 +++++++++++++++-------
 .../integration/ZeppelinSparkClusterTest16.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest20.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest21.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest22.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest23.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest24.java    |  6 +-
 .../integration/ZeppelinSparkClusterTest30.java    |  7 +-
 .../apache/zeppelin/rest/AbstractTestRestApi.java  |  2 +-
 .../interpreter/integration/DownloadUtils.java     | 11 +---
 .../launcher/SparkInterpreterLauncherTest.java     |  2 +-
 25 files changed, 215 insertions(+), 108 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 05779db..c732915 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -97,6 +97,11 @@ jobs:
 
     # Run Spark integration test and unit test
 
+    # Run spark integration of in one zeppelin instance: Spark 3.0
+    - jdk: "openjdk8"
+      dist: xenial
+      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Phadoop2 
-Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat" 
TEST_FLAG="test -DskipRat" MODULES="-pl 
zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown 
-am" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest30,SparkIntegrationTest30 
-DfailIfNoTests=false"
+
     # Run spark integration of in one zeppelin instance (2.4, 2.3, 2.2)
     - jdk: "openjdk8"
       dist: xenial
@@ -162,6 +167,7 @@ before_install:
   - clearcache=$(echo $gitlog | grep -c -E "clear bower|bower clear" || true)
   - if [ "$hasbowerchanged" -gt 0 ] || [ "$clearcache" -gt 0 ]; then echo 
"Clearing bower_components cache"; rm -r zeppelin-web/bower_components; npm 
cache verify; else echo "Using cached bower_components."; fi
   - echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxMetaspaceSize=1024m 
-XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> 
~/.mavenrc
+  - if [[ -n $R ]]; then ./testing/install_R.sh; fi
   - bash -x ./testing/install_external_dependencies.sh
   - ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin || 
true
   - ls .node_modules && cp -r .node_modules zeppelin-web/node_modules || echo 
"node_modules are not cached"
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 990f466..5fc9305 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -85,6 +85,11 @@ You can also set other Spark properties which are not listed 
in the table. For a
     <td>local[*]</td>
     <td>Spark master uri. <br/> e.g. spark://master_host:7077</td>
   <tr>
+  <tr>
+    <td>spark.submit.deployMode</td>
+    <td></td>
+    <td>The deploy mode of Spark driver program, either "client" or "cluster", 
Which means to launch driver program locally ("client") or remotely ("cluster") 
on one of the nodes inside the cluster.</td>
+  <tr>
     <td>spark.app.name</td>
     <td>Zeppelin</td>
     <td>The name of spark application.</td>
@@ -254,8 +259,8 @@ For example,
 
  * **local[*]** in local mode
  * **spark://master:7077** in standalone cluster
- * **yarn-client** in Yarn client mode
- * **yarn-cluster** in Yarn cluster mode
+ * **yarn-client** in Yarn client mode  (Not supported in spark 3.x, refer 
below for how to configure yarn-client in Spark 3.x)
+ * **yarn-cluster** in Yarn cluster mode  (Not supported in spark 3.x, refer 
below for how to configure yarn-client in Spark 3.x)
  * **mesos://host:5050** in Mesos cluster
 
 That's it. Zeppelin will work with any version of Spark and any deployment 
type without rebuilding Zeppelin in this way.
@@ -265,6 +270,29 @@ For the further information about Spark & Zeppelin version 
compatibility, please
 
 > Yarn client mode and local mode will run driver in the same machine with 
 > zeppelin server, this would be dangerous for production. Because it may run 
 > out of memory when there's many spark interpreters running at the same time. 
 > So we suggest you only allow yarn-cluster mode via setting 
 > `zeppelin.spark.only_yarn_cluster` in `zeppelin-site.xml`.
 
+#### Configure yarn mode for Spark 3.x
+
+Specifying `yarn-client` & `yarn-cluster` in `spark.master` is not supported 
in Spark 3.x any more, instead you need to use `spark.master` and 
`spark.submit.deployMode` together.
+
+<table class="table-configuration">
+  <tr>
+    <th>Mode</th>
+    <th>spark.master</th>
+    <th>spark.submit.deployMode</th>
+  </tr>
+  <tr>
+    <td>Yarn Client</td>
+    <td>yarn</td>
+    <td>client</td>
+  </tr>
+  <tr>
+    <td>Yarn Cluster</td>
+    <td>yarn</td>
+    <td>cluster</td>
+  </tr>  
+</table>
+
+
 ## SparkContext, SQLContext, SparkSession, ZeppelinContext
 
 SparkContext, SQLContext, SparkSession (for spark 2.x) and ZeppelinContext are 
automatically created and exposed as variable names `sc`, `sqlContext`, `spark` 
and `z`, respectively, in Scala, Kotlin, Python and R environments.
diff --git a/spark/interpreter/src/main/resources/interpreter-setting.json 
b/spark/interpreter/src/main/resources/interpreter-setting.json
index dfe09d6..db078c5 100644
--- a/spark/interpreter/src/main/resources/interpreter-setting.json
+++ b/spark/interpreter/src/main/resources/interpreter-setting.json
@@ -19,6 +19,13 @@
         "description": "Spark master uri. local | yarn-client | yarn-cluster | 
spark master address of standalone mode, ex) spark://master_host:7077",
         "type": "string"
       },
+      "spark.submit.deployMode": {
+        "envName": "",
+        "propertyName": "spark.submit.deployMode",
+        "defaultValue": "",
+        "description": "The deploy mode of Spark driver program, either 
\"client\" or \"cluster\", Which means to launch driver program locally 
(\"client\") or remotely (\"cluster\") on one of the nodes inside the cluster.",
+        "type": "string"
+      },
       "spark.app.name": {
         "envName": "",
         "propertyName": "spark.app.name",
diff --git a/spark/pom.xml b/spark/pom.xml
index 3102eb8..25301b2 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -195,9 +195,9 @@
         <profile>
             <id>spark-3.0</id>
             <properties>
-                <spark.version>3.0.0-preview2</spark.version>
+                <spark.version>3.0.0</spark.version>
                 <protobuf.version>2.5.0</protobuf.version>
-                <py4j.version>0.10.8.1</py4j.version>
+                <py4j.version>0.10.9</py4j.version>
             </properties>
         </profile>
 
diff --git a/spark/spark3-shims/pom.xml b/spark/spark3-shims/pom.xml
index 645a83e..853bf71 100644
--- a/spark/spark3-shims/pom.xml
+++ b/spark/spark3-shims/pom.xml
@@ -34,7 +34,7 @@
 
   <properties>
     <scala.binary.version>2.12</scala.binary.version>
-    <spark.version>3.0.0-preview2</spark.version>
+    <spark.version>3.0.0</spark.version>
   </properties>
 
   <dependencies>
diff --git a/testing/install_R.sh b/testing/install_R.sh
new file mode 100755
index 0000000..d6bcb86
--- /dev/null
+++ b/testing/install_R.sh
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+# Install instruction from here 
https://cran.r-project.org/bin/linux/ubuntu/README.html
+
+sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 
E298A3A825C0D65DFD57CBB651716619E084DAB9
+echo "deb https://cloud.r-project.org/bin/linux/ubuntu xenial-cran35/" | sudo 
tee -a /etc/apt/sources.list
+sudo apt-get update
+sudo apt-get install -y r-base
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index dd86529..42f35a6 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -44,7 +44,6 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -61,10 +60,11 @@ public abstract class SparkIntegrationTest {
   private String sparkVersion;
   private String sparkHome;
 
-  public SparkIntegrationTest(String sparkVersion) {
-    LOGGER.info("Testing SparkVersion: " + sparkVersion);
+  public SparkIntegrationTest(String sparkVersion, String hadoopVersion) {
+    LOGGER.info("Testing Spark Version: " + sparkVersion);
+    LOGGER.info("Testing Hadoop Version: " + hadoopVersion);
     this.sparkVersion = sparkVersion;
-    this.sparkHome = DownloadUtils.downloadSpark(sparkVersion);
+    this.sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion);
   }
 
   @BeforeClass
@@ -88,6 +88,10 @@ public abstract class SparkIntegrationTest {
     }
   }
 
+  protected void setUpSparkInterpreterSetting(InterpreterSetting 
interpreterSetting) {
+    // sub class can customize spark interpreter setting.
+  }
+
   private void testInterpreterBasics() throws IOException, 
InterpreterException, XmlPullParserException {
     // add jars & packages for testing
     InterpreterSetting sparkInterpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("spark");
@@ -133,7 +137,7 @@ public abstract class SparkIntegrationTest {
 
     // test SparkRInterpreter
     Interpreter sparkrInterpreter = 
interpreterFactory.getInterpreter("spark.r", new ExecutionContext("user1", 
"note1", "test"));
-    if (isSpark2()) {
+    if (isSpark2() || isSpark3()) {
       interpreterResult = sparkrInterpreter.interpret("df <- 
as.DataFrame(faithful)\nhead(df)", context);
     } else {
       interpreterResult = sparkrInterpreter.interpret("df <- 
createDataFrame(sqlContext, faithful)\nhead(df)", context);
@@ -154,14 +158,17 @@ public abstract class SparkIntegrationTest {
     sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false");
     sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", 
"false");
 
-    testInterpreterBasics();
+    try {
+      setUpSparkInterpreterSetting(sparkInterpreterSetting);
+      testInterpreterBasics();
 
-    // no yarn application launched
-    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(0, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
+      // no yarn application launched
+      GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+      GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+      assertEquals(0, response.getApplicationList().size());
+    } finally {
+      interpreterSettingManager.close();
+    }
   }
 
   @Test
@@ -178,16 +185,19 @@ public abstract class SparkIntegrationTest {
     sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false");
     sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", 
"false");
 
-    testInterpreterBasics();
+    try {
+      setUpSparkInterpreterSetting(sparkInterpreterSetting);
+      testInterpreterBasics();
 
-    // 1 yarn application launched
-    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(1, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
+      // 1 yarn application launched
+      GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+      GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+      assertEquals(1, response.getApplicationList().size());
 
-    waitForYarnAppCompleted(30 * 1000);
+    } finally {
+      interpreterSettingManager.close();
+      waitForYarnAppCompleted(30 * 1000);
+    }
   }
 
   private void waitForYarnAppCompleted(int timeout) throws YarnException {
@@ -223,22 +233,29 @@ public abstract class SparkIntegrationTest {
     sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false");
     sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", 
"false");
 
-    testInterpreterBasics();
+    try {
+      setUpSparkInterpreterSetting(sparkInterpreterSetting);
+      testInterpreterBasics();
 
-    // 1 yarn application launched
-    GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
-    GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
-    assertEquals(1, response.getApplicationList().size());
-
-    interpreterSettingManager.close();
+      // 1 yarn application launched
+      GetApplicationsRequest request = 
GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+      GetApplicationsResponse response = 
hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+      assertEquals(1, response.getApplicationList().size());
 
-    waitForYarnAppCompleted(30 * 1000);
+    } finally {
+      interpreterSettingManager.close();
+      waitForYarnAppCompleted(30 * 1000);
+    }
   }
 
   private boolean isSpark2() {
     return this.sparkVersion.startsWith("2.");
   }
 
+  private boolean isSpark3() {
+    return this.sparkVersion.startsWith("3.");
+  }
+
   private String getPythonExec() throws IOException, InterruptedException {
     Process process = Runtime.getRuntime().exec(new String[]{"which", 
"python"});
     if (process.waitFor() != 0) {
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java
index 8f5aacb..6574ed6 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest16.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest16 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest16(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest16(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.6.3"}
+            {"1.6.3", "2.6"}
     });
   }
 
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java
index 4f3ebd8..b9c7cb0 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest20.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest20 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest20(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest20(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.0.2"}
+            {"2.0.2", "2.7"}
     });
   }
 
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java
index 37305cd..5f0fdfc 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest21.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest21 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest21(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest21(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.1.3"}
+            {"2.1.3", "2.7"}
     });
   }
 
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java
index a400118..cddd2a7 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest22.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest22 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest22(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest22(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.2.2"}
+            {"2.2.2", "2.7"}
     });
   }
 
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java
index ca960d3..834e3d8 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest23.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest23 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest23(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest23(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.3.2"}
+            {"2.3.2", "2.7"}
     });
   }
 
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
index aae4951..6920c20 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest24 extends SparkIntegrationTest{
 
-  public SparkIntegrationTest24(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest24(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.4.4"}
+            {"2.4.4", "2.7"}
     });
   }
 
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
index 4371023..8e3c8c8 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.integration;
 
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -26,15 +27,31 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class SparkIntegrationTest30 extends SparkIntegrationTest {
 
-  public SparkIntegrationTest30(String sparkVersion) {
-    super(sparkVersion);
+  public SparkIntegrationTest30(String sparkVersion, String hadoopVersion) {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"3.0.0-preview2"}
+            {"3.0.0", "2.7"},
+            {"3.0.0", "3.2"}
     });
   }
 
+  @Override
+  protected void setUpSparkInterpreterSetting(InterpreterSetting 
interpreterSetting) {
+    // spark3 doesn't support yarn-client and yarn-cluster any more, use
+    // spark.master and spark.submit.deployMode instead
+    String sparkMaster = 
interpreterSetting.getJavaProperties().getProperty("spark.master");
+    if (sparkMaster.equals("yarn-client")) {
+      interpreterSetting.setProperty("spark.master", "yarn");
+      interpreterSetting.setProperty("spark.submit.deployMode", "client");
+    } else if (sparkMaster.equals("yarn-cluster")){
+      interpreterSetting.setProperty("spark.master", "yarn");
+      interpreterSetting.setProperty("spark.submit.deployMode", "cluster");
+    } else if (sparkMaster.startsWith("local")) {
+      interpreterSetting.setProperty("spark.submit.deployMode", "client");
+    }
+  }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index 599727f..2eb505a 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -76,10 +76,10 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
   private String sparkHome;
   private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous");
 
-  public ZeppelinSparkClusterTest(String sparkVersion) throws Exception {
+  public ZeppelinSparkClusterTest(String sparkVersion, String hadoopVersion) 
throws Exception {
     this.sparkVersion = sparkVersion;
     LOGGER.info("Testing SparkVersion: " + sparkVersion);
-    this.sparkHome = DownloadUtils.downloadSpark(sparkVersion);
+    this.sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion);
     if (!verifiedSparkVersions.contains(sparkVersion)) {
       verifiedSparkVersions.add(sparkVersion);
       setupSparkInterpreter(sparkHome);
@@ -224,14 +224,14 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
       IOUtils.copy(new StringReader("{\"metadata\": { \"key\": 84896, 
\"value\": 54 }}\n"),
               jsonFileWriter);
       jsonFileWriter.close();
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         p.setText("%spark spark.read.json(\"file://" + 
tmpJsonFile.getAbsolutePath() + "\")");
       } else {
         p.setText("%spark sqlContext.read.json(\"file://" + 
tmpJsonFile.getAbsolutePath() + "\")");
       }
       note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         assertTrue(p.getReturn().message().get(0).getData().contains(
                 "org.apache.spark.sql.DataFrame = [metadata: struct<key: 
bigint, value: bigint>]"));
       } else {
@@ -247,7 +247,7 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
 
   @Test
   public void sparkReadCSVTest() throws IOException {
-    if (!isSpark2()) {
+    if (isSpark1()) {
       // csv if not supported in spark 1.x natively
       return;
     }
@@ -279,7 +279,7 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
       note = TestUtils.getInstance(Notebook.class).createNote("note1", 
anonymous);
       // test basic dataframe api
       Paragraph p = note.addNewParagraph(anonymous);
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" +
                 ".toDF(\"name\", \"age\")\n" +
                 "df.collect()");
@@ -295,7 +295,7 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
 
       // test display DataFrame
       p = note.addNewParagraph(anonymous);
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         p.setText("%spark val df=spark.createDataFrame(Seq((\"hello\",20)))" +
                 ".toDF(\"name\", \"age\")\n" +
                 "df.createOrReplaceTempView(\"test_table\")\n" +
@@ -353,7 +353,7 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
               p.getReturn().message().get(0).getData().contains("name age\n1 
hello  20"));
 
       // test display DataSet
-      if (isSpark2()) {
+      if (isSpark2() || isSpark3()) {
         p = note.addNewParagraph(anonymous);
         p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" +
             "z.show(ds)");
@@ -374,16 +374,24 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
     Note note = null;
     try {
       note = TestUtils.getInstance(Notebook.class).createNote("note1", 
anonymous);
+      Paragraph p = note.addNewParagraph(anonymous);
 
-      String sqlContextName = "sqlContext";
-      if (isSpark2()) {
-        sqlContextName = "spark";
+      if (isSpark3()) {
+        p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), 
age=c(19, 23, 18))\n" +
+                "df <- createDataFrame(localDF)\n" +
+                "count(df)"
+        );
+      } else {
+        String sqlContextName = "sqlContext";
+        if (isSpark2() || isSpark3()) {
+          sqlContextName = "spark";
+        }
+        p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), 
age=c(19, 23, 18))\n" +
+                "df <- createDataFrame(" + sqlContextName + ", localDF)\n" +
+                "count(df)"
+        );
       }
-      Paragraph p = note.addNewParagraph(anonymous);
-      p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), 
age=c(19, 23, 18))\n" +
-          "df <- createDataFrame(" + sqlContextName + ", localDF)\n" +
-          "count(df)"
-      );
+
       note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals("[1] 3", p.getReturn().message().get(0).getData().trim());
@@ -415,7 +423,7 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals("name_abc\n", p.getReturn().message().get(0).getData());
 
-      if (!isSpark2()) {
+      if (isSpark1()) {
         // run sqlContext test
         p = note.addNewParagraph(anonymous);
         p.setText("%pyspark from pyspark.sql import Row\n" +
@@ -461,7 +469,7 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
             .contains("Fail to execute line 3: print(a2)"));
         assertTrue(p.getReturn().message().get(0).getData()
             .contains("name 'a2' is not defined"));
-      } else {
+      } else if (isSpark2()){
         // run SparkSession test
         p = note.addNewParagraph(anonymous);
         p.setText("%pyspark from pyspark.sql import Row\n" +
@@ -480,6 +488,25 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
         assertEquals(Status.FINISHED, p.getStatus());
         
assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) 
||
             
"[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData()));
+      } else {
+        // run SparkSession test
+        p = note.addNewParagraph(anonymous);
+        p.setText("%pyspark from pyspark.sql import Row\n" +
+                "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+                "df.collect()");
+        note.run(p.getId(), true);
+        assertEquals(Status.FINISHED, p.getStatus());
+        assertEquals("[Row(id=1, age=20)]\n", 
p.getReturn().message().get(0).getData());
+
+        // test udf
+        p = note.addNewParagraph(anonymous);
+        // use SQLContext to register UDF but use this UDF through SparkSession
+        p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: 
len(x))\n" +
+                "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()");
+        note.run(p.getId(), true);
+        assertEquals(Status.FINISHED, p.getStatus());
+        
assertTrue("[Row(len=u'3')]\n".equals(p.getReturn().message().get(0).getData()) 
||
+                
"[Row(len='3')]\n".equals(p.getReturn().message().get(0).getData()));
       }
     } finally {
       if (null != note) {
@@ -680,14 +707,16 @@ public abstract class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
     }
   }
 
-  private int toIntSparkVersion(String sparkVersion) {
-    String[] split = sparkVersion.split("\\.");
-    int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
-    return version;
+  private boolean isSpark1() {
+    return sparkVersion.startsWith("1.");
   }
 
   private boolean isSpark2() {
-    return toIntSparkVersion(sparkVersion) >= 20;
+    return sparkVersion.startsWith("2.");
+  }
+
+  private boolean isSpark3() {
+    return sparkVersion.startsWith("3.");
   }
 
   @Test
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java
index 954f024..777c166 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest16.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest16 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest16(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest16(String sparkVersion, String hadoopVersion) 
throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.6.3"}
+            {"1.6.3", "2.6"}
     });
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java
index 22687d9..be2b5c6 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest20.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest20 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest20(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest20(String sparkVersion, String hadoopVersion) 
throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.0.2"}
+            {"2.0.2", "2.7"}
     });
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java
index fd98364..c127312 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest21.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest21 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest21(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest21(String sparkVersion, String hadoopVersion) 
throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.1.3"}
+            {"2.1.3", "2.7"}
     });
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java
index 9b51e17..d7a63af 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest22.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest22 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest22(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest22(String sparkVersion, String hadoopVersion) 
throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.2.2"}
+            {"2.2.2", "2.7"}
     });
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java
index 22ef673..7b15af7 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest23.java
@@ -26,15 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest23 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest23(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest23(String sparkVersion, String hadoopVersion) 
throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.3.2"}
+            {"2.3.2", "2.7"}
     });
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
index a55a504..e1f05ff 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
@@ -26,14 +26,14 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest24 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest24(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest24(String sparkVersion, String hadoopVersion) 
throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.4.4"}
+            {"2.4.4", "2.7"}
     });
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
index 0a09ad5..8405900 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
@@ -26,14 +26,15 @@ import java.util.List;
 @RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest30 extends ZeppelinSparkClusterTest {
 
-  public ZeppelinSparkClusterTest30(String sparkVersion) throws Exception {
-    super(sparkVersion);
+  public ZeppelinSparkClusterTest30(String sparkVersion, String hadoopVersion) 
throws Exception {
+    super(sparkVersion, hadoopVersion);
   }
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"3.0.0-preview2"}
+            {"3.0.0", "2.7"},
+            {"3.0.0", "3.2"}
     });
   }
 }
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 5658c01..2d4d861 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -261,7 +261,7 @@ public abstract class AbstractTestRestApi {
       LOG.info("Zeppelin Server is started.");
 
       // set up spark interpreter
-      String sparkHome = DownloadUtils.downloadSpark("2.4.4");
+      String sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
       InterpreterSettingManager interpreterSettingManager = 
TestUtils.getInstance(InterpreterSettingManager.class);
       InterpreterSetting interpreterSetting = 
interpreterSettingManager.getInterpreterSettingByName("spark");
       interpreterSetting.setProperty("SPARK_HOME", sparkHome);
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
index db75c24..4e6211c 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
@@ -47,20 +47,15 @@ public class DownloadUtils {
     }
   }
 
-  public static String downloadSpark(String version) {
-    String hadoopVersion = "2.7";
-    if (version.startsWith("1.")) {
-      // Use hadoop 2.6 for spark 1.x
-      hadoopVersion = "2.6";
-    }
+  public static String downloadSpark(String sparkVersion, String 
hadoopVersion) {
     String sparkDownloadFolder = downloadFolder + "/spark";
     File targetSparkHomeFolder =
-            new File(sparkDownloadFolder + "/spark-" + version + "-bin-hadoop" 
+ hadoopVersion);
+            new File(sparkDownloadFolder + "/spark-" + sparkVersion + 
"-bin-hadoop" + hadoopVersion);
     if (targetSparkHomeFolder.exists()) {
       LOGGER.info("Skip to download spark as it is already downloaded.");
       return targetSparkHomeFolder.getAbsolutePath();
     }
-    download("spark", version, "-bin-hadoop" + hadoopVersion + ".tgz");
+    download("spark", sparkVersion, "-bin-hadoop" + hadoopVersion + ".tgz");
     return targetSparkHomeFolder.getAbsolutePath();
   }
 
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index ab2ebae..736c11a 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -52,7 +52,7 @@ public class SparkInterpreterLauncherTest {
       System.clearProperty(confVar.getVarName());
     }
 
-    sparkHome = DownloadUtils.downloadSpark("2.3.2");
+    sparkHome = DownloadUtils.downloadSpark("2.3.2", "2.7");
     
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
             new File("..").getAbsolutePath());
 

Reply via email to