This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 6a23c9d [ZEPPELIN-4510]. Support Spark 3.0 6a23c9d is described below commit 6a23c9d25e054e427a141e6aa56fbe94a30703b6 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Dec 26 16:47:15 2019 +0800 [ZEPPELIN-4510]. Support Spark 3.0 ### What is this PR for? This PR is to support spark 3.x. The main change is creating new module `spark3-shims`. Unit test is updated as well. This PR is against spark 3.0 preview because at time of this PR, spark 3 is not officially released. ### What type of PR is it? [Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://jira.apache.org/jira/browse/ZEPPELIN-4510 ### How should this be tested? * CI pass. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3611 from zjffdu/ZEPPELIN-4510 and squashes the following commits: 1f3337028 [Jeff Zhang] [ZEPPELIN-4510]. Support Spark 3.0 --- spark/interpreter/pom.xml | 6 + .../apache/zeppelin/spark/IPySparkInterpreter.java | 4 +- .../apache/zeppelin/spark/PySparkInterpreter.java | 4 +- .../apache/zeppelin/spark/SparkRInterpreter.java | 12 +- .../main/java/org/apache/zeppelin/spark/Utils.java | 9 -- .../src/main/resources/python/zeppelin_ipyspark.py | 3 +- .../src/main/resources/python/zeppelin_pyspark.py | 10 +- .../zeppelin/spark/IPySparkInterpreterTest.java | 6 +- .../zeppelin/spark/SparkRInterpreterTest.java | 30 ++--- .../org/apache/zeppelin/spark/SparkShimsTest.java | 15 ++- .../zeppelin/spark/SparkShinyInterpreterTest.java | 8 +- .../zeppelin/spark/SparkSqlInterpreterTest.java | 6 +- spark/pom.xml | 11 ++ .../org/apache/zeppelin/spark/SparkShims.java | 29 +++-- .../org/apache/zeppelin/spark/SparkVersion.java | 12 +- spark/spark3-shims/pom.xml | 87 +++++++++++++++ .../org/apache/zeppelin/spark/Spark3Shims.java | 121 +++++++++++++++++++++ .../integration/SparkIntegrationTest30.java | 40 +++++++ .../integration/ZeppelinSparkClusterTest30.java | 39 +++++++ .../interpreter/integration/DownloadUtils.java | 10 +- 20 files changed, 387 insertions(+), 75 deletions(-) diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index dfebec8..3490d53 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -103,6 +103,12 @@ <dependency> <groupId>org.apache.zeppelin</groupId> + <artifactId>spark3-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-kotlin</artifactId> <version>${project.version}</version> </dependency> diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index c961336..88493a9 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -126,8 +126,8 @@ public class IPySparkInterpreter extends IPythonInterpreter { return sparkInterpreter.getProgress(context); } - public boolean isSpark2() { - return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0); + public boolean isSpark1() { + return sparkInterpreter.getSparkVersion().getMajorVersion() == 1; } public JavaSparkContext getJavaSparkContext() { diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 546f74a..98e3d86 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -206,7 +206,7 @@ public class PySparkInterpreter extends PythonInterpreter { } } - public boolean isSpark2() { - return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0); + public boolean isSpark1() { + return sparkInterpreter.getSparkVersion().getMajorVersion() == 1; } } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 7eb1eef..57db39f 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -42,7 +42,7 @@ public class SparkRInterpreter extends RInterpreter { private SparkInterpreter sparkInterpreter; private SparkVersion sparkVersion; - private boolean isSpark2; + private boolean isSpark1; private SparkContext sc; private JavaSparkContext jsc; @@ -71,11 +71,11 @@ public class SparkRInterpreter extends RInterpreter { this.sc = sparkInterpreter.getSparkContext(); this.jsc = sparkInterpreter.getJavaSparkContext(); this.sparkVersion = new SparkVersion(sc.version()); - this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0); + this.isSpark1 = sparkVersion.getMajorVersion() == 1; ZeppelinRContext.setSparkContext(sc); ZeppelinRContext.setJavaSparkContext(jsc); - if (isSpark2) { + if (!isSpark1) { ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); } ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); @@ -93,7 +93,7 @@ public class SparkRInterpreter extends RInterpreter { sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false); String setJobGroup = ""; // assign setJobGroup to dummy__, otherwise it would print NULL for this statement - if (isSpark2) { + if (!isSpark1) { setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup + "\", \" +" + jobDesc + "\", TRUE)"; } else { @@ -159,4 +159,8 @@ public class SparkRInterpreter extends RInterpreter { InterpreterContext interpreterContext) { return new ArrayList<>(); } + + public boolean isSpark1() { + return isSpark1; + } } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java index 381d023..ea8fb8b 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -105,15 +105,6 @@ class Utils { } } - static boolean isSpark2() { - try { - Class.forName("org.apache.spark.sql.SparkSession"); - return true; - } catch (ClassNotFoundException e) { - return false; - } - } - public static String buildJobGroupId(InterpreterContext context) { String uName = "anonymous"; if (context.getAuthenticationInfo() != null) { diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py index 66b9bef..07aad9b 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py @@ -49,9 +49,8 @@ jconf = jsc.getConf() conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf) sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) -if intp.isSpark2(): +if not intp.isSpark1(): from pyspark.sql import SparkSession - spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped else: diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py index 8fcca9b..1917349 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py @@ -20,12 +20,9 @@ from pyspark.conf import SparkConf from pyspark.context import SparkContext # for back compatibility -from pyspark.sql import SQLContext, HiveContext, Row +from pyspark.sql import SQLContext, Row intp = gateway.entry_point -isSpark2 = intp.isSpark2() -if isSpark2: - from pyspark.sql import SparkSession jsc = intp.getJavaSparkContext() java_import(gateway.jvm, "org.apache.spark.SparkEnv") @@ -43,11 +40,10 @@ jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) - -if isSpark2: +if not intp.isSpark1(): + from pyspark.sql import SparkSession spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) sqlc = __zSqlc__ = __zSpark__._wrapped - else: sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index 5a271d6..c64ba71 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -128,7 +128,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { // spark sql context = createInterpreterContext(mockIntpEventClient); - if (!isSpark2(sparkVersion)) { + if (isSpark1(sparkVersion)) { result = interpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); @@ -256,8 +256,8 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { } } - private static boolean isSpark2(String sparkVersion) { - return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2."); + private static boolean isSpark1(String sparkVersion) { + return sparkVersion.startsWith("'1.") || sparkVersion.startsWith("u'1."); } private static InterpreterContext createInterpreterContext(RemoteInterpreterEventClient mockRemoteEventClient) { diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index 011812e..8c2b799 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -85,8 +85,15 @@ public class SparkRInterpreterTest { result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - if (result.message().get(0).getData().contains("2.")) { - // spark 2.x + if (sparkRInterpreter.isSpark1()) { + // spark 1.x + result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("eruptions waiting")); + // spark job url is sent + verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class)); + } else { + // spark 2.x or 3.x result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains("eruptions waiting")); @@ -100,12 +107,12 @@ public class SparkRInterpreterTest { public void run() { try { InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" + - " df,\n" + - " function(x) {\n" + - " Sys.sleep(3)\n" + - " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" + - " })\n" + - "head(ldf, 3)", context); + " df,\n" + + " function(x) {\n" + + " Sys.sleep(3)\n" + + " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" + + " })\n" + + "head(ldf, 3)", context); assertTrue(result.message().get(0).getData().contains("cancelled")); } catch (InterpreterException e) { fail("Should not throw InterpreterException"); @@ -116,13 +123,6 @@ public class SparkRInterpreterTest { thread.start(); Thread.sleep(1000); sparkRInterpreter.cancel(context); - } else { - // spark 1.x - result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertTrue(result.message().get(0).getData().contains("eruptions waiting")); - // spark job url is sent - verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class)); } // plotting diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java index 1143883..ca18ff8 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java @@ -125,10 +125,19 @@ public class SparkShimsTest { mockIntpEventClient = mock(RemoteInterpreterEventClient.class); when(mockContext.getIntpEventClient()).thenReturn(mockIntpEventClient); doNothing().when(mockIntpEventClient).onParaInfosReceived(argumentCaptor.capture()); + try { - sparkShims = SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString(), new Properties(), null); - } catch (Throwable ignore) { - sparkShims = SparkShims.getInstance(SparkVersion.SPARK_1_6_0.toString(), new Properties(), null); + sparkShims = SparkShims.getInstance(SparkVersion.SPARK_3_1_0.toString(), new Properties(), null); + } catch (Throwable e1) { + try { + sparkShims = SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString(), new Properties(), null); + } catch (Throwable e2) { + try { + sparkShims = SparkShims.getInstance(SparkVersion.SPARK_1_6_0.toString(), new Properties(), null); + } catch (Throwable e3) { + throw new RuntimeException("All SparkShims are tried, but no one can be created."); + } + } } } diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java index eb0e56c..86a591c 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java @@ -115,14 +115,14 @@ public class SparkShinyInterpreterTest extends ShinyInterpreterTest { // verify shiny app via calling its rest api HttpResponse<String> response = Unirest.get(shinyURL).asString(); - if (sparkInterpreter.getSparkVersion().isSpark2()) { - assertEquals(200, response.getStatus()); - assertTrue(response.getBody(), response.getBody().contains("Spark Version")); - } else { + if (sparkInterpreter.getSparkVersion().isSpark1()) { // spark 1.x will fail due to sparkR.version is not available for spark 1.x assertEquals(500, response.getStatus()); assertTrue(response.getBody(), response.getBody().contains("could not find function \"sparkR.version\"")); + } else { + assertEquals(200, response.getStatus()); + assertTrue(response.getBody(), response.getBody().contains("Spark Version")); } } } diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index c3f245b..1ccfbc5 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -191,7 +191,7 @@ public class SparkSqlInterpreterTest { assertEquals(InterpreterResult.Code.ERROR, ret.code()); assertEquals(ret.message().toString(), 2, ret.message().size()); assertEquals(ret.message().toString(), Type.TABLE, ret.message().get(0).getType()); - if (sparkInterpreter.getSparkVersion().isSpark2()) { + if (!sparkInterpreter.getSparkVersion().isSpark1()) { assertTrue(ret.message().toString(), ret.message().get(1).getData().contains("ParseException")); } @@ -200,7 +200,7 @@ public class SparkSqlInterpreterTest { assertEquals(InterpreterResult.Code.ERROR, ret.code()); assertEquals(ret.message().toString(), 2, ret.message().size()); assertEquals(ret.message().toString(), Type.TABLE, ret.message().get(0).getType()); - if (sparkInterpreter.getSparkVersion().isSpark2()) { + if (!sparkInterpreter.getSparkVersion().isSpark1()) { assertTrue(ret.message().toString(), ret.message().get(1).getData().contains("ParseException")); } @@ -213,7 +213,7 @@ public class SparkSqlInterpreterTest { @Test public void testConcurrentSQL() throws InterpreterException, InterruptedException { - if (sparkInterpreter.getSparkVersion().isSpark2()) { + if (!sparkInterpreter.getSparkVersion().isSpark1()) { sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); } else { sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); diff --git a/spark/pom.xml b/spark/pom.xml index 876b14e..3d9d539 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -67,6 +67,7 @@ <module>spark-shims</module> <module>spark1-shims</module> <module>spark2-shims</module> + <module>spark3-shims</module> </modules> <dependencies> @@ -196,6 +197,16 @@ </profile> <!-- profile spark-x only affect the embedded spark version in zeppelin distribution --> + + <profile> + <id>spark-3.0</id> + <properties> + <spark.version>3.0.0-preview2</spark.version> + <protobuf.version>2.5.0</protobuf.version> + <py4j.version>0.10.8.1</py4j.version> + </properties> + </profile> + <profile> <id>spark-2.4</id> <activation> diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java index e281b25..67b140a 100644 --- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java +++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java @@ -54,15 +54,20 @@ public abstract class SparkShims { this.properties = properties; } - private static SparkShims loadShims(String sparkVersion, Properties properties, Object entryPoint) - throws ReflectiveOperationException { + private static SparkShims loadShims(int sparkMajorVersion, Properties properties, Object entryPoint) + throws Exception { Class<?> sparkShimsClass; - if ("2".equals(sparkVersion)) { + if (sparkMajorVersion == 3) { + LOGGER.info("Initializing shims for Spark 3.x"); + sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark3Shims"); + } else if (sparkMajorVersion == 2) { LOGGER.info("Initializing shims for Spark 2.x"); sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark2Shims"); - } else { + } else if (sparkMajorVersion == 1){ LOGGER.info("Initializing shims for Spark 1.x"); sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims"); + } else { + throw new Exception("Spark major version: '" + sparkMajorVersion + "' is not supported yet"); } Constructor c = sparkShimsClass.getConstructor(Properties.class, Object.class); @@ -76,22 +81,16 @@ public abstract class SparkShims { * @param entryPoint entryPoint is SparkContext for Spark 1.x SparkSession for Spark 2.x * @return */ - public static SparkShims getInstance(String sparkVersion, Properties properties, Object entryPoint) { + public static SparkShims getInstance(String sparkVersion, + Properties properties, + Object entryPoint) throws Exception { if (sparkShims == null) { - String sparkMajorVersion = getSparkMajorVersion(sparkVersion); - try { - sparkShims = loadShims(sparkMajorVersion, properties, entryPoint); - } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } + int sparkMajorVersion = SparkVersion.fromVersionString(sparkVersion).getMajorVersion(); + sparkShims = loadShims(sparkMajorVersion, properties, entryPoint); } return sparkShims; } - private static String getSparkMajorVersion(String sparkVersion) { - return sparkVersion.startsWith("2") ? "2" : "1"; - } - /** * This is due to SparkListener api change between spark1 and spark2. SparkListener is trait in * spark1 while it is abstract class in spark2. diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkVersion.java similarity index 94% rename from spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java rename to spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkVersion.java index 88b10ef..ec80740 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkVersion.java @@ -32,10 +32,10 @@ public class SparkVersion { public static final SparkVersion SPARK_2_3_0 = SparkVersion.fromVersionString("2.3.0"); public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1"); public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0"); - public static final SparkVersion SPARK_3_0_0 = SparkVersion.fromVersionString("3.0.0"); + public static final SparkVersion SPARK_3_1_0 = SparkVersion.fromVersionString("3.1.0"); public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_6_0; - public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_3_0_0; + public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_3_1_0; private int version; private int majorVersion; @@ -73,6 +73,10 @@ public class SparkVersion { return version; } + public int getMajorVersion() { + return majorVersion; + } + public String toString() { return versionString; } @@ -85,8 +89,8 @@ public class SparkVersion { return new SparkVersion(versionString); } - public boolean isSpark2() { - return this.newerThanEquals(SPARK_2_0_0); + public boolean isSpark1() { + return this.olderThan(SPARK_2_0_0); } public boolean isSecretSocketSupported() { diff --git a/spark/spark3-shims/pom.xml b/spark/spark3-shims/pom.xml new file mode 100644 index 0000000..8c6d1ec --- /dev/null +++ b/spark/spark3-shims/pom.xml @@ -0,0 +1,87 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>spark-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark3-shims</artifactId> + <version>0.9.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Spark3 Shims</name> + + <properties> + <scala.binary.version>2.12</scala.binary.version> + <spark.version>3.0.0-preview2</spark.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-interpreter-api</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java b/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java new file mode 100644 index 0000000..911c9eb --- /dev/null +++ b/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + +import org.apache.commons.lang.StringUtils; +import org.apache.spark.SparkContext; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.types.StructType; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.ResultMessages; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +public class Spark3Shims extends SparkShims { + + private SparkSession sparkSession; + + public Spark3Shims(Properties properties, Object entryPoint) { + super(properties); + this.sparkSession = (SparkSession) entryPoint; + } + + public void setupSparkListener(final String master, + final String sparkWebUrl, + final InterpreterContext context) { + SparkContext sc = SparkContext.getOrCreate(); + sc.addSparkListener(new SparkListener() { + @Override + public void onJobStart(SparkListenerJobStart jobStart) { + + if (sc.getConf().getBoolean("spark.ui.enabled", true) && + !Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", "false"))) { + buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context); + } + } + }); + } + + @Override + public String showDataFrame(Object obj, int maxResult) { + if (obj instanceof Dataset) { + Dataset<Row> df = ((Dataset) obj).toDF(); + String[] columns = df.columns(); + // DDL will empty DataFrame + if (columns.length == 0) { + return ""; + } + // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult + List<Row> rows = df.takeAsList(maxResult + 1); + StringBuilder msg = new StringBuilder(); + msg.append("%table "); + msg.append(StringUtils.join(columns, "\t")); + msg.append("\n"); + boolean isLargerThanMaxResult = rows.size() > maxResult; + if (isLargerThanMaxResult) { + rows = rows.subList(0, maxResult); + } + for (Row row : rows) { + for (int i = 0; i < row.size(); ++i) { + msg.append(row.get(i)); + if (i != row.size() -1) { + msg.append("\t"); + } + } + msg.append("\n"); + } + + if (isLargerThanMaxResult) { + msg.append("\n"); + msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult")); + } + // append %text at the end, otherwise the following output will be put in table as well. + msg.append("\n%text "); + return msg.toString(); + } else { + return obj.toString(); + } + } + + @Override + public Dataset<Row> getAsDataFrame(String value) { + String[] lines = value.split("\\n"); + String head = lines[0]; + String[] columns = head.split("\t"); + StructType schema = new StructType(); + for (String column : columns) { + schema = schema.add(column, "String"); + } + + List<Row> rows = new ArrayList<>(); + for (int i = 1; i < lines.length; ++i) { + String[] tokens = lines[i].split("\t"); + Row row = new GenericRow(tokens); + rows.add(row); + } + return sparkSession.createDataFrame(rows, schema); + } +} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java new file mode 100644 index 0000000..4371023 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest30.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class SparkIntegrationTest30 extends SparkIntegrationTest { + + public SparkIntegrationTest30(String sparkVersion) { + super(sparkVersion); + } + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"3.0.0-preview2"} + }); + } + +} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java new file mode 100644 index 0000000..0a09ad5 --- /dev/null +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest30.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.integration; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class ZeppelinSparkClusterTest30 extends ZeppelinSparkClusterTest { + + public ZeppelinSparkClusterTest30(String sparkVersion) throws Exception { + super(sparkVersion); + } + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"3.0.0-preview2"} + }); + } +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java index b0fdec1..db75c24 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java @@ -48,13 +48,19 @@ public class DownloadUtils { } public static String downloadSpark(String version) { + String hadoopVersion = "2.7"; + if (version.startsWith("1.")) { + // Use hadoop 2.6 for spark 1.x + hadoopVersion = "2.6"; + } String sparkDownloadFolder = downloadFolder + "/spark"; - File targetSparkHomeFolder = new File(sparkDownloadFolder + "/spark-" + version + "-bin-hadoop2.6"); + File targetSparkHomeFolder = + new File(sparkDownloadFolder + "/spark-" + version + "-bin-hadoop" + hadoopVersion); if (targetSparkHomeFolder.exists()) { LOGGER.info("Skip to download spark as it is already downloaded."); return targetSparkHomeFolder.getAbsolutePath(); } - download("spark", version, "-bin-hadoop2.6.tgz"); + download("spark", version, "-bin-hadoop" + hadoopVersion + ".tgz"); return targetSparkHomeFolder.getAbsolutePath(); }