This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a23c9d  [ZEPPELIN-4510]. Support Spark 3.0
6a23c9d is described below

commit 6a23c9d25e054e427a141e6aa56fbe94a30703b6
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Dec 26 16:47:15 2019 +0800

    [ZEPPELIN-4510]. Support Spark 3.0
    
    ### What is this PR for?
    This PR is to support spark 3.x. The main change is creating new module 
`spark3-shims`. Unit test is updated as well. This PR is against spark 3.0 
preview because at time of this PR, spark 3 is not officially released.
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-4510
    
    ### How should this be tested?
    * CI pass.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3611 from zjffdu/ZEPPELIN-4510 and squashes the following commits:
    
    1f3337028 [Jeff Zhang] [ZEPPELIN-4510]. Support Spark 3.0
---
 spark/interpreter/pom.xml                          |   6 +
 .../apache/zeppelin/spark/IPySparkInterpreter.java |   4 +-
 .../apache/zeppelin/spark/PySparkInterpreter.java  |   4 +-
 .../apache/zeppelin/spark/SparkRInterpreter.java   |  12 +-
 .../main/java/org/apache/zeppelin/spark/Utils.java |   9 --
 .../src/main/resources/python/zeppelin_ipyspark.py |   3 +-
 .../src/main/resources/python/zeppelin_pyspark.py  |  10 +-
 .../zeppelin/spark/IPySparkInterpreterTest.java    |   6 +-
 .../zeppelin/spark/SparkRInterpreterTest.java      |  30 ++---
 .../org/apache/zeppelin/spark/SparkShimsTest.java  |  15 ++-
 .../zeppelin/spark/SparkShinyInterpreterTest.java  |   8 +-
 .../zeppelin/spark/SparkSqlInterpreterTest.java    |   6 +-
 spark/pom.xml                                      |  11 ++
 .../org/apache/zeppelin/spark/SparkShims.java      |  29 +++--
 .../org/apache/zeppelin/spark/SparkVersion.java    |  12 +-
 spark/spark3-shims/pom.xml                         |  87 +++++++++++++++
 .../org/apache/zeppelin/spark/Spark3Shims.java     | 121 +++++++++++++++++++++
 .../integration/SparkIntegrationTest30.java        |  40 +++++++
 .../integration/ZeppelinSparkClusterTest30.java    |  39 +++++++
 .../interpreter/integration/DownloadUtils.java     |  10 +-
 20 files changed, 387 insertions(+), 75 deletions(-)

diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index dfebec8..3490d53 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -103,6 +103,12 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark3-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-kotlin</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index c961336..88493a9 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -126,8 +126,8 @@ public class IPySparkInterpreter extends IPythonInterpreter 
{
     return sparkInterpreter.getProgress(context);
   }
 
-  public boolean isSpark2() {
-    return 
sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
+  public boolean isSpark1() {
+    return sparkInterpreter.getSparkVersion().getMajorVersion() == 1;
   }
 
   public JavaSparkContext getJavaSparkContext() {
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 546f74a..98e3d86 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -206,7 +206,7 @@ public class PySparkInterpreter extends PythonInterpreter {
     }
   }
 
-  public boolean isSpark2() {
-    return 
sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
+  public boolean isSpark1() {
+    return sparkInterpreter.getSparkVersion().getMajorVersion() == 1;
   }
 }
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 7eb1eef..57db39f 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -42,7 +42,7 @@ public class SparkRInterpreter extends RInterpreter {
 
   private SparkInterpreter sparkInterpreter;
   private SparkVersion sparkVersion;
-  private boolean isSpark2;
+  private boolean isSpark1;
   private SparkContext sc;
   private JavaSparkContext jsc;
 
@@ -71,11 +71,11 @@ public class SparkRInterpreter extends RInterpreter {
     this.sc = sparkInterpreter.getSparkContext();
     this.jsc = sparkInterpreter.getJavaSparkContext();
     this.sparkVersion = new SparkVersion(sc.version());
-    this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0);
+    this.isSpark1 = sparkVersion.getMajorVersion() == 1;
 
     ZeppelinRContext.setSparkContext(sc);
     ZeppelinRContext.setJavaSparkContext(jsc);
-    if (isSpark2) {
+    if (!isSpark1) {
       ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
     }
     ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
@@ -93,7 +93,7 @@ public class SparkRInterpreter extends RInterpreter {
     sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
     String setJobGroup = "";
     // assign setJobGroup to dummy__, otherwise it would print NULL for this 
statement
-    if (isSpark2) {
+    if (!isSpark1) {
       setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
           "\", \" +" + jobDesc + "\", TRUE)";
     } else {
@@ -159,4 +159,8 @@ public class SparkRInterpreter extends RInterpreter {
                                                 InterpreterContext 
interpreterContext) {
     return new ArrayList<>();
   }
+
+  public boolean isSpark1() {
+    return isSpark1;
+  }
 }
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index 381d023..ea8fb8b 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -105,15 +105,6 @@ class Utils {
     }
   }
 
-  static boolean isSpark2() {
-    try {
-      Class.forName("org.apache.spark.sql.SparkSession");
-      return true;
-    } catch (ClassNotFoundException e) {
-      return false;
-    }
-  }
-  
   public static String buildJobGroupId(InterpreterContext context) {
     String uName = "anonymous";
     if (context.getAuthenticationInfo() != null) {
diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py 
b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
index 66b9bef..07aad9b 100644
--- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
+++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
@@ -49,9 +49,8 @@ jconf = jsc.getConf()
 conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
 sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 
-if intp.isSpark2():
+if not intp.isSpark1():
     from pyspark.sql import SparkSession
-
     spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
     sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
 else:
diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py 
b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
index 8fcca9b..1917349 100644
--- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
@@ -20,12 +20,9 @@ from pyspark.conf import SparkConf
 from pyspark.context import SparkContext
 
 # for back compatibility
-from pyspark.sql import SQLContext, HiveContext, Row
+from pyspark.sql import SQLContext, Row
 
 intp = gateway.entry_point
-isSpark2 = intp.isSpark2()
-if isSpark2:
-  from pyspark.sql import SparkSession
 
 jsc = intp.getJavaSparkContext()
 java_import(gateway.jvm, "org.apache.spark.SparkEnv")
@@ -43,11 +40,10 @@ jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 
-
-if isSpark2:
+if not intp.isSpark1():
+  from pyspark.sql import SparkSession
   spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
   sqlc = __zSqlc__ = __zSpark__._wrapped
-
 else:
   sqlc = __zSqlc__ = SQLContext(sparkContext=sc, 
sqlContext=intp.getSQLContext())
 
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index 5a271d6..c64ba71 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -128,7 +128,7 @@ public class IPySparkInterpreterTest extends 
IPythonInterpreterTest {
 
     // spark sql
     context = createInterpreterContext(mockIntpEventClient);
-    if (!isSpark2(sparkVersion)) {
+    if (isSpark1(sparkVersion)) {
       result = interpreter.interpret("df = 
sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       interpreterResultMessages = context.out.toInterpreterResultMessage();
@@ -256,8 +256,8 @@ public class IPySparkInterpreterTest extends 
IPythonInterpreterTest {
     }
   }
 
-  private static boolean isSpark2(String sparkVersion) {
-    return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2.");
+  private static boolean isSpark1(String sparkVersion) {
+    return sparkVersion.startsWith("'1.") || sparkVersion.startsWith("u'1.");
   }
 
   private static InterpreterContext 
createInterpreterContext(RemoteInterpreterEventClient mockRemoteEventClient) {
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index 011812e..8c2b799 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -85,8 +85,15 @@ public class SparkRInterpreterTest {
 
     result = sparkRInterpreter.interpret("sparkR.version()", 
getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    if (result.message().get(0).getData().contains("2.")) {
-      // spark 2.x
+    if (sparkRInterpreter.isSpark1()) {
+      // spark 1.x
+      result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, 
faithful)\nhead(df)", getInterpreterContext());
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertTrue(result.message().get(0).getData().contains("eruptions 
waiting"));
+      // spark job url is sent
+      verify(mockRemoteIntpEventClient, 
atLeastOnce()).onParaInfosReceived(any(Map.class));
+    } else {
+      // spark 2.x or 3.x
       result = sparkRInterpreter.interpret("df <- 
as.DataFrame(faithful)\nhead(df)", getInterpreterContext());
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertTrue(result.message().get(0).getData().contains("eruptions 
waiting"));
@@ -100,12 +107,12 @@ public class SparkRInterpreterTest {
         public void run() {
           try {
             InterpreterResult result = sparkRInterpreter.interpret("ldf <- 
dapplyCollect(\n" +
-                "         df,\n" +
-                "         function(x) {\n" +
-                "           Sys.sleep(3)\n" +
-                "           x <- cbind(x, \"waiting_secs\" = x$waiting * 
60)\n" +
-                "         })\n" +
-                "head(ldf, 3)", context);
+                    "         df,\n" +
+                    "         function(x) {\n" +
+                    "           Sys.sleep(3)\n" +
+                    "           x <- cbind(x, \"waiting_secs\" = x$waiting * 
60)\n" +
+                    "         })\n" +
+                    "head(ldf, 3)", context);
             
assertTrue(result.message().get(0).getData().contains("cancelled"));
           } catch (InterpreterException e) {
             fail("Should not throw InterpreterException");
@@ -116,13 +123,6 @@ public class SparkRInterpreterTest {
       thread.start();
       Thread.sleep(1000);
       sparkRInterpreter.cancel(context);
-    } else {
-      // spark 1.x
-      result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, 
faithful)\nhead(df)", getInterpreterContext());
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-      assertTrue(result.message().get(0).getData().contains("eruptions 
waiting"));
-      // spark job url is sent
-      verify(mockRemoteIntpEventClient, 
atLeastOnce()).onParaInfosReceived(any(Map.class));
     }
 
     // plotting
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
index 1143883..ca18ff8 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
@@ -125,10 +125,19 @@ public class SparkShimsTest {
       mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
       when(mockContext.getIntpEventClient()).thenReturn(mockIntpEventClient);
       
doNothing().when(mockIntpEventClient).onParaInfosReceived(argumentCaptor.capture());
+
       try {
-        sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString(), new Properties(), 
null);
-      } catch (Throwable ignore) {
-        sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_1_6_0.toString(), new Properties(), 
null);
+        sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_3_1_0.toString(), new Properties(), 
null);
+      } catch (Throwable e1) {
+        try {
+          sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString(), new Properties(), 
null);
+        } catch (Throwable e2) {
+          try {
+            sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_1_6_0.toString(), new Properties(), 
null);
+          } catch (Throwable e3) {
+            throw new RuntimeException("All SparkShims are tried, but no one 
can be created.");
+          }
+        }
       }
     }
 
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
index eb0e56c..86a591c 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
@@ -115,14 +115,14 @@ public class SparkShinyInterpreterTest extends 
ShinyInterpreterTest {
 
     // verify shiny app via calling its rest api
     HttpResponse<String> response = Unirest.get(shinyURL).asString();
-    if (sparkInterpreter.getSparkVersion().isSpark2()) {
-      assertEquals(200, response.getStatus());
-      assertTrue(response.getBody(), response.getBody().contains("Spark 
Version"));
-    } else {
+    if (sparkInterpreter.getSparkVersion().isSpark1()) {
       // spark 1.x will fail due to sparkR.version is not available for spark 
1.x
       assertEquals(500, response.getStatus());
       assertTrue(response.getBody(),
               response.getBody().contains("could not find function 
\"sparkR.version\""));
+    } else {
+      assertEquals(200, response.getStatus());
+      assertTrue(response.getBody(), response.getBody().contains("Spark 
Version"));
     }
   }
 }
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index c3f245b..1ccfbc5 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -191,7 +191,7 @@ public class SparkSqlInterpreterTest {
     assertEquals(InterpreterResult.Code.ERROR, ret.code());
     assertEquals(ret.message().toString(), 2, ret.message().size());
     assertEquals(ret.message().toString(), Type.TABLE, 
ret.message().get(0).getType());
-    if (sparkInterpreter.getSparkVersion().isSpark2()) {
+    if (!sparkInterpreter.getSparkVersion().isSpark1()) {
       assertTrue(ret.message().toString(), 
ret.message().get(1).getData().contains("ParseException"));
     }
     
@@ -200,7 +200,7 @@ public class SparkSqlInterpreterTest {
     assertEquals(InterpreterResult.Code.ERROR, ret.code());
     assertEquals(ret.message().toString(), 2, ret.message().size());
     assertEquals(ret.message().toString(), Type.TABLE, 
ret.message().get(0).getType());
-    if (sparkInterpreter.getSparkVersion().isSpark2()) {
+    if (!sparkInterpreter.getSparkVersion().isSpark1()) {
       assertTrue(ret.message().toString(), 
ret.message().get(1).getData().contains("ParseException"));
     }
 
@@ -213,7 +213,7 @@ public class SparkSqlInterpreterTest {
 
   @Test
   public void testConcurrentSQL() throws InterpreterException, 
InterruptedException {
-    if (sparkInterpreter.getSparkVersion().isSpark2()) {
+    if (!sparkInterpreter.getSparkVersion().isSpark1()) {
       sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => 
{Thread.sleep(e*1000); e})", context);
     } else {
       sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) 
=> {Thread.sleep(e*1000); e})", context);
diff --git a/spark/pom.xml b/spark/pom.xml
index 876b14e..3d9d539 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -67,6 +67,7 @@
         <module>spark-shims</module>
         <module>spark1-shims</module>
         <module>spark2-shims</module>
+        <module>spark3-shims</module>
     </modules>
 
     <dependencies>
@@ -196,6 +197,16 @@
         </profile>
 
         <!-- profile spark-x only affect the embedded spark version in 
zeppelin distribution -->
+
+        <profile>
+            <id>spark-3.0</id>
+            <properties>
+                <spark.version>3.0.0-preview2</spark.version>
+                <protobuf.version>2.5.0</protobuf.version>
+                <py4j.version>0.10.8.1</py4j.version>
+            </properties>
+        </profile>
+
         <profile>
             <id>spark-2.4</id>
             <activation>
diff --git 
a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java 
b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
index e281b25..67b140a 100644
--- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
@@ -54,15 +54,20 @@ public abstract class SparkShims {
     this.properties = properties;
   }
 
-  private static SparkShims loadShims(String sparkVersion, Properties 
properties, Object entryPoint)
-      throws ReflectiveOperationException {
+  private static SparkShims loadShims(int sparkMajorVersion, Properties 
properties, Object entryPoint)
+      throws Exception {
     Class<?> sparkShimsClass;
-    if ("2".equals(sparkVersion)) {
+    if (sparkMajorVersion == 3) {
+      LOGGER.info("Initializing shims for Spark 3.x");
+      sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark3Shims");
+    } else if (sparkMajorVersion == 2) {
       LOGGER.info("Initializing shims for Spark 2.x");
       sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark2Shims");
-    } else {
+    } else if (sparkMajorVersion == 1){
       LOGGER.info("Initializing shims for Spark 1.x");
       sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims");
+    } else {
+      throw new Exception("Spark major version: '" + sparkMajorVersion + "' is 
not supported yet");
     }
 
     Constructor c = sparkShimsClass.getConstructor(Properties.class, 
Object.class);
@@ -76,22 +81,16 @@ public abstract class SparkShims {
    * @param entryPoint  entryPoint is SparkContext for Spark 1.x SparkSession 
for Spark 2.x
    * @return
    */
-  public static SparkShims getInstance(String sparkVersion, Properties 
properties, Object entryPoint) {
+  public static SparkShims getInstance(String sparkVersion,
+                                       Properties properties,
+                                       Object entryPoint) throws Exception {
     if (sparkShims == null) {
-      String sparkMajorVersion = getSparkMajorVersion(sparkVersion);
-      try {
-        sparkShims = loadShims(sparkMajorVersion, properties, entryPoint);
-      } catch (ReflectiveOperationException e) {
-        throw new RuntimeException(e);
-      }
+      int sparkMajorVersion = 
SparkVersion.fromVersionString(sparkVersion).getMajorVersion();
+      sparkShims = loadShims(sparkMajorVersion, properties, entryPoint);
     }
     return sparkShims;
   }
 
-  private static String getSparkMajorVersion(String sparkVersion) {
-    return sparkVersion.startsWith("2") ? "2" : "1";
-  }
-
   /**
    * This is due to SparkListener api change between spark1 and spark2. 
SparkListener is trait in
    * spark1 while it is abstract class in spark2.
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java 
b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkVersion.java
similarity index 94%
rename from 
spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
rename to 
spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkVersion.java
index 88b10ef..ec80740 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ 
b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkVersion.java
@@ -32,10 +32,10 @@ public class SparkVersion {
   public static final SparkVersion SPARK_2_3_0 = 
SparkVersion.fromVersionString("2.3.0");
   public static final SparkVersion SPARK_2_3_1 = 
SparkVersion.fromVersionString("2.3.1");
   public static final SparkVersion SPARK_2_4_0 = 
SparkVersion.fromVersionString("2.4.0");
-  public static final SparkVersion SPARK_3_0_0 = 
SparkVersion.fromVersionString("3.0.0");
+  public static final SparkVersion SPARK_3_1_0 = 
SparkVersion.fromVersionString("3.1.0");
 
   public static final SparkVersion MIN_SUPPORTED_VERSION =  SPARK_1_6_0;
-  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_3_0_0;
+  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_3_1_0;
 
   private int version;
   private int majorVersion;
@@ -73,6 +73,10 @@ public class SparkVersion {
     return version;
   }
 
+  public int getMajorVersion() {
+    return majorVersion;
+  }
+
   public String toString() {
     return versionString;
   }
@@ -85,8 +89,8 @@ public class SparkVersion {
     return new SparkVersion(versionString);
   }
 
-  public boolean isSpark2() {
-    return this.newerThanEquals(SPARK_2_0_0);
+  public boolean isSpark1() {
+    return this.olderThan(SPARK_2_0_0);
   }
 
   public boolean isSecretSocketSupported() {
diff --git a/spark/spark3-shims/pom.xml b/spark/spark3-shims/pom.xml
new file mode 100644
index 0000000..8c6d1ec
--- /dev/null
+++ b/spark/spark3-shims/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>spark-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>spark3-shims</artifactId>
+  <version>0.9.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <name>Zeppelin: Spark3 Shims</name>
+
+  <properties>
+    <scala.binary.version>2.12</scala.binary.version>
+    <spark.version>3.0.0-preview2</spark.version>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-interpreter-api</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-interpreter-setting</id>
+            <phase>none</phase>
+            <configuration>
+              <skip>true</skip>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git 
a/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java 
b/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java
new file mode 100644
index 0000000..911c9eb
--- /dev/null
+++ 
b/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.spark;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.spark.SparkContext;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.ResultMessages;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class Spark3Shims extends SparkShims {
+
+  private SparkSession sparkSession;
+
+  public Spark3Shims(Properties properties, Object entryPoint) {
+    super(properties);
+    this.sparkSession = (SparkSession) entryPoint;
+  }
+
+  public void setupSparkListener(final String master,
+                                 final String sparkWebUrl,
+                                 final InterpreterContext context) {
+    SparkContext sc = SparkContext.getOrCreate();
+    sc.addSparkListener(new SparkListener() {
+      @Override
+      public void onJobStart(SparkListenerJobStart jobStart) {
+
+        if (sc.getConf().getBoolean("spark.ui.enabled", true) &&
+            
!Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", 
"false"))) {
+          buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), 
jobStart.properties(), context);
+        }
+      }
+    });
+  }
+
+  @Override
+  public String showDataFrame(Object obj, int maxResult) {
+    if (obj instanceof Dataset) {
+      Dataset<Row> df = ((Dataset) obj).toDF();
+      String[] columns = df.columns();
+      // DDL will empty DataFrame
+      if (columns.length == 0) {
+        return "";
+      }
+      // fetch maxResult+1 rows so that we can check whether it is larger than 
zeppelin.spark.maxResult
+      List<Row> rows = df.takeAsList(maxResult + 1);
+      StringBuilder msg = new StringBuilder();
+      msg.append("%table ");
+      msg.append(StringUtils.join(columns, "\t"));
+      msg.append("\n");
+      boolean isLargerThanMaxResult = rows.size() > maxResult;
+      if (isLargerThanMaxResult) {
+        rows = rows.subList(0, maxResult);
+      }
+      for (Row row : rows) {
+        for (int i = 0; i < row.size(); ++i) {
+          msg.append(row.get(i));
+          if (i != row.size() -1) {
+            msg.append("\t");
+          }
+        }
+        msg.append("\n");
+      }
+
+      if (isLargerThanMaxResult) {
+        msg.append("\n");
+        msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, 
"zeppelin.spark.maxResult"));
+      }
+      // append %text at the end, otherwise the following output will be put 
in table as well.
+      msg.append("\n%text ");
+      return msg.toString();
+    } else {
+      return obj.toString();
+    }
+  }
+
+  @Override
+  public Dataset<Row> getAsDataFrame(String value) {
+    String[] lines = value.split("\\n");
+    String head = lines[0];
+    String[] columns = head.split("\t");
+    StructType schema = new StructType();
+    for (String column : columns) {
+      schema = schema.add(column, "String");
+    }
+
+    List<Row> rows = new ArrayList<>();
+    for (int i = 1; i < lines.length; ++i) {
+      String[] tokens = lines[i].split("\t");
+      Row row = new GenericRow(tokens);
+      rows.add(row);
+    }
+    return sparkSession.createDataFrame(rows, schema);
+  }
+}
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
new file mode 100644
index 0000000..4371023
--- /dev/null
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class SparkIntegrationTest30 extends SparkIntegrationTest {
+
+  public SparkIntegrationTest30(String sparkVersion) {
+    super(sparkVersion);
+  }
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"3.0.0-preview2"}
+    });
+  }
+
+}
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
new file mode 100644
index 0000000..0a09ad5
--- /dev/null
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(value = Parameterized.class)
+public class ZeppelinSparkClusterTest30 extends ZeppelinSparkClusterTest {
+
+  public ZeppelinSparkClusterTest30(String sparkVersion) throws Exception {
+    super(sparkVersion);
+  }
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {"3.0.0-preview2"}
+    });
+  }
+}
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
index b0fdec1..db75c24 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
@@ -48,13 +48,19 @@ public class DownloadUtils {
   }
 
   public static String downloadSpark(String version) {
+    String hadoopVersion = "2.7";
+    if (version.startsWith("1.")) {
+      // Use hadoop 2.6 for spark 1.x
+      hadoopVersion = "2.6";
+    }
     String sparkDownloadFolder = downloadFolder + "/spark";
-    File targetSparkHomeFolder = new File(sparkDownloadFolder + "/spark-" + 
version + "-bin-hadoop2.6");
+    File targetSparkHomeFolder =
+            new File(sparkDownloadFolder + "/spark-" + version + "-bin-hadoop" 
+ hadoopVersion);
     if (targetSparkHomeFolder.exists()) {
       LOGGER.info("Skip to download spark as it is already downloaded.");
       return targetSparkHomeFolder.getAbsolutePath();
     }
-    download("spark", version, "-bin-hadoop2.6.tgz");
+    download("spark", version, "-bin-hadoop" + hadoopVersion + ".tgz");
     return targetSparkHomeFolder.getAbsolutePath();
   }
 

Reply via email to