This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new a0da85d [ZEPPEIN-5555] Remove scala-2.10 & spark 1.x support in spark interpreter a0da85d is described below commit a0da85dd48f9000bf510167cc72a558008bf651d Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Oct 14 20:16:35 2021 +0800 [ZEPPEIN-5555] Remove scala-2.10 & spark 1.x support in spark interpreter ### What is this PR for? This PR remove the support of scala-2.10 & spark 1.x in spark interpreter. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5555 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4250 from zjffdu/ZEPPELIN-5555 and squashes the following commits: 9577e21e72 [Jeff Zhang] update doc 0329a40e07 [Jeff Zhang] [ZEPPEIN-5555] Remove scala-2.10 & spark 1.x support in spark interpreter --- docs/setup/basics/how_to_build.md | 14 +- spark/interpreter/pom.xml | 6 - .../apache/zeppelin/spark/IPySparkInterpreter.java | 4 - .../apache/zeppelin/spark/PySparkInterpreter.java | 4 - .../apache/zeppelin/spark/SparkInterpreter.java | 10 +- .../apache/zeppelin/spark/SparkRInterpreter.java | 19 +-- .../src/main/resources/python/zeppelin_ipyspark.py | 9 +- .../src/main/resources/python/zeppelin_pyspark.py | 9 +- .../zeppelin/spark/IPySparkInterpreterTest.java | 73 ++++------- .../zeppelin/spark/SparkInterpreterTest.java | 9 +- .../zeppelin/spark/SparkRInterpreterTest.java | 70 +++++----- .../zeppelin/spark/SparkShinyInterpreterTest.java | 11 +- .../zeppelin/spark/SparkSqlInterpreterTest.java | 14 +- spark/pom.xml | 50 -------- spark/scala-2.10/pom.xml | 58 --------- spark/scala-2.10/spark-scala-parent | 1 - .../zeppelin/spark/SparkScala210Interpreter.scala | 119 ----------------- .../zeppelin/spark/SparkScala211Interpreter.scala | 1 - spark/spark-dependencies/pom.xml | 7 - .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 46 +------ .../java/org/apache/zeppelin/spark/SparkShims.java | 3 - .../org/apache/zeppelin/spark/SparkVersion.java | 4 - spark/spark1-shims/pom.xml | 86 ------------- .../org/apache/zeppelin/spark/Spark1Shims.java | 141 --------------------- 24 files changed, 76 insertions(+), 692 deletions(-) diff --git a/docs/setup/basics/how_to_build.md b/docs/setup/basics/how_to_build.md index 5939ca4..4b1b9a3 100644 --- a/docs/setup/basics/how_to_build.md +++ b/docs/setup/basics/how_to_build.md @@ -106,13 +106,9 @@ Set spark major version Available profiles are ``` +-Pspark-3.1 -Pspark-3.0 -Pspark-2.4 --Pspark-2.3 --Pspark-2.2 --Pspark-2.1 --Pspark-2.0 --Pspark-1.6 ``` minor version can be adjusted by `-Dspark.version=x.x.x` @@ -125,13 +121,12 @@ Actually Zeppelin supports all the versions of scala (2.10, 2.11, 2.12) in Spark Available profiles are ``` --Pspark-scala-2.10 -Pspark-scala-2.11 -Pspark-scala-2.12 ``` If you want to use Spark 3.x in the embedded mode, then you have to specify both profile `spark-3.0` and `spark-scala-2.12`, -because Spark 3.x doesn't support scala 2.10 and 2.11. +because Spark 3.x doesn't support scala 2.11. #### Build hadoop with Zeppelin (`-Phadoop[version]`) @@ -169,11 +164,6 @@ Here are some examples with several options: # build with spark-2.4, spark-scala-2.11 ./mvnw clean package -Pspark-2.4 -Pspark-scala-2.11 -DskipTests -# build with spark-1.6, spark-scala-2.10 -./mvnw clean package -Pspark-1.6 -Pspark-scala-2.10 -DskipTests - -# build with CDH -./mvnw clean package -Pspark-1.6 -Pspark-scala-2.10 -Dhadoop.version=2.6.0-cdh5.5.0 -Pvendor-repo -DskipTests ``` Ignite Interpreter diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index 2e9a33f..42ab2cc 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -82,12 +82,6 @@ <dependency> <groupId>org.apache.zeppelin</groupId> - <artifactId>spark1-shims</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.zeppelin</groupId> <artifactId>spark2-shims</artifactId> <version>${project.version}</version> </dependency> diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index 0e3729f..ab6b3db 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -157,10 +157,6 @@ public class IPySparkInterpreter extends IPythonInterpreter { return sparkInterpreter.getProgress(context); } - public boolean isSpark1() { - return sparkInterpreter.getSparkVersion().getMajorVersion() == 1; - } - public boolean isSpark3() { return sparkInterpreter.getSparkVersion().getMajorVersion() == 3; } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 2fdc37b..514ffa9 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -233,10 +233,6 @@ public class PySparkInterpreter extends PythonInterpreter { } } - public boolean isSpark1() { - return sparkInterpreter.getSparkVersion().getMajorVersion() == 1; - } - public boolean isSpark3() { return sparkInterpreter.getSparkVersion().getMajorVersion() == 3; } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 7b1460a..16faf99 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -84,7 +84,6 @@ public class SparkInterpreter extends AbstractInterpreter { } this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean( properties.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true")); - innerInterpreterClassMap.put("2.10", "org.apache.zeppelin.spark.SparkScala210Interpreter"); innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.spark.SparkScala211Interpreter"); innerInterpreterClassMap.put("2.12", "org.apache.zeppelin.spark.SparkScala212Interpreter"); } @@ -142,7 +141,6 @@ public class SparkInterpreter extends AbstractInterpreter { * Load AbstractSparkScalaInterpreter based on the runtime scala version. * Load AbstractSparkScalaInterpreter from the following location: * - * SparkScala210Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.10 * SparkScala211Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.11 * SparkScala212Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.12 * @@ -257,9 +255,7 @@ public class SparkInterpreter extends AbstractInterpreter { private String extractScalaVersion() throws InterpreterException { String scalaVersionString = scala.util.Properties.versionString(); LOGGER.info("Using Scala: " + scalaVersionString); - if (scalaVersionString.contains("version 2.10")) { - return "2.10"; - } else if (scalaVersionString.contains("version 2.11")) { + if (scalaVersionString.contains("version 2.11")) { return "2.11"; } else if (scalaVersionString.contains("version 2.12")) { return "2.12"; @@ -272,10 +268,6 @@ public class SparkInterpreter extends AbstractInterpreter { return extractScalaVersion().equals("2.12"); } - public boolean isScala210() throws InterpreterException { - return extractScalaVersion().equals("2.10"); - } - private List<String> getDependencyFiles() throws InterpreterException { List<String> depFiles = new ArrayList<>(); // add jar from local repo diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 27aeb52..4512ddb 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -43,7 +43,6 @@ public class SparkRInterpreter extends RInterpreter { private SparkInterpreter sparkInterpreter; private SparkVersion sparkVersion; - private boolean isSpark1; private SparkContext sc; private JavaSparkContext jsc; @@ -72,7 +71,6 @@ public class SparkRInterpreter extends RInterpreter { this.sc = sparkInterpreter.getSparkContext(); this.jsc = sparkInterpreter.getJavaSparkContext(); this.sparkVersion = new SparkVersion(sc.version()); - this.isSpark1 = sparkVersion.getMajorVersion() == 1; LOGGER.info("SparkRInterpreter: SPARK_HOME={}", sc.getConf().getenv("SPARK_HOME")); Arrays.stream(sc.getConf().getAll()) @@ -82,9 +80,7 @@ public class SparkRInterpreter extends RInterpreter { ZeppelinRContext.setSparkContext(sc); ZeppelinRContext.setJavaSparkContext(jsc); - if (!isSpark1) { - ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); - } + ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext()); super.open(); @@ -100,13 +96,8 @@ public class SparkRInterpreter extends RInterpreter { sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false); String setJobGroup = ""; // assign setJobGroup to dummy__, otherwise it would print NULL for this statement - if (!isSpark1) { - setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup + - "\", \" +" + jobDesc + "\", TRUE)"; - } else { - setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup + - "\", \"" + jobDesc + "\", TRUE)"; - } + setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup + + "\", \" +" + jobDesc + "\", TRUE)"; lines = setJobGroup + "\n" + lines; if (sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) { // setLocalProperty is only available from spark 2.3.0 @@ -162,8 +153,4 @@ public class SparkRInterpreter extends RInterpreter { InterpreterContext interpreterContext) { return new ArrayList<>(); } - - public boolean isSpark1() { - return isSpark1; - } } diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py index 4b0dbaa..fdf7b97 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py @@ -56,12 +56,9 @@ jconf = jsc.getConf() conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf) sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) -if not intp.isSpark1(): - from pyspark.sql import SparkSession - spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) - sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped -else: - sqlContext = sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) +from pyspark.sql import SparkSession +spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) +sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped class IPySparkZeppelinContext(PyZeppelinContext): diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py index 2038c14..8dd3224 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py @@ -48,12 +48,9 @@ jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) -if not intp.isSpark1(): - from pyspark.sql import SparkSession - spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) - sqlc = __zSqlc__ = __zSpark__._wrapped -else: - sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) +from pyspark.sql import SparkSession +spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) +sqlc = __zSqlc__ = __zSpark__._wrapped sqlContext = __zSqlc__ diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index a7c48c6..35cd712 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -133,51 +133,30 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { // spark sql context = createInterpreterContext(mockIntpEventClient); - if (isSpark1(sparkVersion)) { - result = interpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.toInterpreterResultMessage(); - assertEquals( - "+---+---+\n" + - "| _1| _2|\n" + - "+---+---+\n" + - "| 1| a|\n" + - "| 2| b|\n" + - "+---+---+", interpreterResultMessages.get(0).getData().trim()); - - context = createInterpreterContext(mockIntpEventClient); - result = interpreter.interpret("z.show(df)", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.toInterpreterResultMessage(); - assertEquals( - "_1 _2\n" + - "1 a\n" + - "2 b", interpreterResultMessages.get(0).getData().trim()); - } else { - result = interpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.toInterpreterResultMessage(); - assertEquals( - "+---+---+\n" + - "| _1| _2|\n" + - "+---+---+\n" + - "| 1| a|\n" + - "| 2| b|\n" + - "+---+---+", interpreterResultMessages.get(0).getData().trim()); - - context = createInterpreterContext(mockIntpEventClient); - result = interpreter.interpret("z.show(df)", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.toInterpreterResultMessage(); - assertEquals( - "_1 _2\n" + - "1 a\n" + - "2 b", interpreterResultMessages.get(0).getData().trim()); - - // spark sql python API bindings - result = interpreter.interpret("df.explain()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } + result = interpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertEquals( + "+---+---+\n" + + "| _1| _2|\n" + + "+---+---+\n" + + "| 1| a|\n" + + "| 2| b|\n" + + "+---+---+", interpreterResultMessages.get(0).getData().trim()); + + context = createInterpreterContext(mockIntpEventClient); + result = interpreter.interpret("z.show(df)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + interpreterResultMessages = context.out.toInterpreterResultMessage(); + assertEquals( + "_1 _2\n" + + "1 a\n" + + "2 b", interpreterResultMessages.get(0).getData().trim()); + + // spark sql python API bindings + result = interpreter.interpret("df.explain()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // cancel if (interpreter instanceof IPySparkInterpreter) { final InterpreterContext context2 = createInterpreterContext(mockIntpEventClient); @@ -273,10 +252,6 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { } } - private static boolean isSpark1(String sparkVersion) { - return sparkVersion.startsWith("'1.") || sparkVersion.startsWith("u'1."); - } - private static InterpreterContext createInterpreterContext(RemoteInterpreterEventClient mockRemoteEventClient) { return InterpreterContext.builder() .setNoteId("noteId") diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index c750ea9..7b3d1df 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -136,12 +136,9 @@ public class SparkInterpreterTest { result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - // test $intp, only works for scala after 2.11 - if (!interpreter.isScala210()) { - result = interpreter.interpret("$intp", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } - + result = interpreter.interpret("$intp", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // Companion object with case class result = interpreter.interpret("import scala.math._\n" + "object Circle {\n" + diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index 8a54f69..7087be2 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -85,48 +85,40 @@ public class SparkRInterpreterTest { result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - if (sparkRInterpreter.isSpark1()) { - // spark 1.x - result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertTrue(result.message().get(0).getData().contains("eruptions waiting")); - // spark job url is sent - verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class)); - } else { - // spark 2.x or 3.x - result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertTrue(result.message().get(0).getData().contains("eruptions waiting")); - // spark job url is sent - verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class)); - - // cancel - final InterpreterContext context = getInterpreterContext(); - Thread thread = new Thread() { - @Override - public void run() { - try { - InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" + - " df,\n" + - " function(x) {\n" + - " Sys.sleep(3)\n" + - " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" + - " })\n" + - "head(ldf, 3)", context); - assertTrue(result.message().get(0).getData().contains("cancelled")); - } catch (InterpreterException e) { - fail("Should not throw InterpreterException"); - } + + result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().get(0).getData().contains("eruptions waiting")); + // spark job url is sent + verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class)); + + // cancel + InterpreterContext context = getInterpreterContext(); + InterpreterContext finalContext = context; + Thread thread = new Thread() { + @Override + public void run() { + try { + InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" + + " df,\n" + + " function(x) {\n" + + " Sys.sleep(3)\n" + + " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" + + " })\n" + + "head(ldf, 3)", finalContext); + assertTrue(result.message().get(0).getData().contains("cancelled")); + } catch (InterpreterException e) { + fail("Should not throw InterpreterException"); } - }; - thread.setName("Cancel-Thread"); - thread.start(); - Thread.sleep(1000); - sparkRInterpreter.cancel(context); - } + } + }; + thread.setName("Cancel-Thread"); + thread.start(); + Thread.sleep(1000); + sparkRInterpreter.cancel(context); // plotting - InterpreterContext context = getInterpreterContext(); + context = getInterpreterContext(); context.getLocalProperties().put("imageWidth", "100"); result = sparkRInterpreter.interpret("hist(mtcars$mpg)", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java index 77cd54b..cfe0fbe 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java @@ -115,14 +115,7 @@ public class SparkShinyInterpreterTest extends ShinyInterpreterTest { // verify shiny app via calling its rest api HttpResponse<String> response = Unirest.get(shinyURL).asString(); - if (sparkInterpreter.getSparkVersion().isSpark1()) { - // spark 1.x will fail due to sparkR.version is not available for spark 1.x - assertEquals(500, response.getStatus()); - assertTrue(response.getBody(), - response.getBody().contains("could not find function \"sparkR.version\"")); - } else { - assertEquals(200, response.getStatus()); - assertTrue(response.getBody(), response.getBody().contains("Spark Version")); - } + assertEquals(200, response.getStatus()); + assertTrue(response.getBody(), response.getBody().contains("Spark Version")); } } diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index 1ce7329..f469fc5 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -225,18 +225,14 @@ public class SparkSqlInterpreterTest { assertEquals(InterpreterResult.Code.ERROR, ret.code()); assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size()); assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType()); - if (!sparkInterpreter.getSparkVersion().isSpark1()) { - assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input")); - } + assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input")); // One correct sql + One invalid sql + One valid sql (skipped) ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select count(1) from gr", context); assertEquals(InterpreterResult.Code.ERROR, ret.code()); assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size()); assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType()); - if (!sparkInterpreter.getSparkVersion().isSpark1()) { - assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input")); - } + assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input")); // Two 2 comments ret = sqlInterpreter.interpret( @@ -247,11 +243,7 @@ public class SparkSqlInterpreterTest { @Test public void testConcurrentSQL() throws InterpreterException, InterruptedException { - if (!sparkInterpreter.getSparkVersion().isSpark1()) { - sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); - } else { - sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); - } + sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); Thread thread1 = new Thread() { @Override diff --git a/spark/pom.xml b/spark/pom.xml index e67ee8a..1614002 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -57,12 +57,10 @@ <modules> <module>interpreter</module> <module>spark-scala-parent</module> - <module>scala-2.10</module> <module>scala-2.11</module> <module>scala-2.12</module> <module>spark-dependencies</module> <module>spark-shims</module> - <module>spark1-shims</module> <module>spark2-shims</module> <module>spark3-shims</module> </modules> @@ -158,14 +156,6 @@ </properties> </profile> - <profile> - <id>spark-scala-2.10</id> - <properties> - <spark.scala.version>2.10.5</spark.scala.version> - <spark.scala.binary.version>2.10</spark.scala.binary.version> - </properties> - </profile> - <!-- profile spark-x only affect the embedded spark version in zeppelin distribution --> <profile> @@ -204,45 +194,5 @@ </properties> </profile> - <profile> - <id>spark-2.3</id> - <properties> - <spark.version>2.3.3</spark.version> - <protobuf.version>2.5.0</protobuf.version> - <py4j.version>0.10.7</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-2.2</id> - <properties> - <spark.version>2.2.3</spark.version> - <py4j.version>0.10.7</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-2.1</id> - <properties> - <spark.version>2.1.3</spark.version> - <py4j.version>0.10.7</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-2.0</id> - <properties> - <spark.version>2.0.2</spark.version> - <py4j.version>0.10.3</py4j.version> - </properties> - </profile> - - <profile> - <id>spark-1.6</id> - <properties> - <spark.version>1.6.3</spark.version> - <py4j.version>0.9</py4j.version> - </properties> - </profile> </profiles> </project> diff --git a/spark/scala-2.10/pom.xml b/spark/scala-2.10/pom.xml deleted file mode 100644 index 2e1e843..0000000 --- a/spark/scala-2.10/pom.xml +++ /dev/null @@ -1,58 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>org.apache.zeppelin</groupId> - <artifactId>spark-scala-parent</artifactId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../spark-scala-parent/pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>spark-scala-2.10</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Spark Interpreter Scala_2.10</name> - - <properties> - <spark.version>2.2.3</spark.version> - <spark.scala.version>2.10.5</spark.scala.version> - <spark.scala.binary.version>2.10</spark.scala.binary.version> - <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version> - </properties> - - <build> - <plugins> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - </plugin> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/spark/scala-2.10/spark-scala-parent b/spark/scala-2.10/spark-scala-parent deleted file mode 120000 index e5e899e..0000000 --- a/spark/scala-2.10/spark-scala-parent +++ /dev/null @@ -1 +0,0 @@ -../spark-scala-parent \ No newline at end of file diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala deleted file mode 100644 index 34d69c0..0000000 --- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import java.io.File -import java.net.URLClassLoader -import java.nio.file.{Files, Paths} -import java.util.Properties - -import org.apache.spark.SparkConf -import org.apache.spark.repl.SparkILoop -import org.apache.spark.repl.SparkILoop._ -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup} -import org.slf4j.{Logger, LoggerFactory} - -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter._ - -/** - * SparkInterpreter for scala-2.10 - */ -class SparkScala210Interpreter(override val conf: SparkConf, - override val depFiles: java.util.List[String], - override val properties: Properties, - override val interpreterGroup: InterpreterGroup, - override val sparkInterpreterClassLoader: URLClassLoader, - val outputDir: File) - extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { - - lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) - - private var sparkILoop: SparkILoop = _ - - override val interpreterOutput = - new InterpreterOutputStream(LoggerFactory.getLogger(classOf[SparkScala210Interpreter])) - - override def open(): Unit = { - super.open() - // redirect the output of open to InterpreterOutputStream, so that user can have more - // diagnose info in frontend - if (InterpreterContext.get() != null) { - interpreterOutput.setInterpreterOutput(InterpreterContext.get().out) - } - - LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) - conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - // Only Spark1 requires to create http server, Spark2 removes HttpServer class. - startHttpServer(outputDir).foreach { case (server, uri) => - sparkHttpServer = server - conf.set("spark.repl.class.uri", uri) - } - val target = conf.get("spark.repl.target", "jvm-1.6") - - val settings = new Settings() - settings.embeddedDefaults(sparkInterpreterClassLoader) - settings.usejavacp.value = true - settings.target.value = target - - this.userJars = getUserJars() - LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) - settings.classpath.value = userJars.mkString(File.pathSeparator) - if (properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean) { - Console.setOut(interpreterOutput) - } - sparkILoop = new SparkILoop() - - setDeclaredField(sparkILoop, "settings", settings) - callMethod(sparkILoop, "createInterpreter") - sparkILoop.initializeSynchronous() - callMethod(sparkILoop, "postInitialization") - val reader = callMethod(sparkILoop, - "org$apache$spark$repl$SparkILoop$$chooseReader", - Array(settings.getClass), Array(settings)).asInstanceOf[InteractiveReader] - setDeclaredField(sparkILoop, "org$apache$spark$repl$SparkILoop$$in", reader) - this.scalaCompletion = reader.completion - - createSparkContext() - createZeppelinContext() - } - - protected def completion(buf: String, - cursor: Int, - context: InterpreterContext): java.util.List[InterpreterCompletion] = { - val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates - .map(e => new InterpreterCompletion(e, e, null)) - scala.collection.JavaConversions.seqAsJavaList(completions) - } - - def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = - sparkILoop.interpret(code) - - protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { - sparkILoop.beQuietDuring { - sparkILoop.bind(name, tpe, value, modifier) - } - } - - override def getScalaShellClassLoader: ClassLoader = { - val sparkIMain = sparkILoop.interpreter - callMethod(sparkIMain, "classLoader").asInstanceOf[ClassLoader] - } -} diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index 6f531d2..41470f1 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -60,7 +60,6 @@ class SparkScala211Interpreter(override val conf: SparkConf, LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - // Only Spark1 requires to create http server, Spark2 removes HttpServer class. startHttpServer(outputDir).foreach { case (server, uri) => sparkHttpServer = server conf.set("spark.repl.class.uri", uri) diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml index 34e482f..81d9bb0 100644 --- a/spark/spark-dependencies/pom.xml +++ b/spark/spark-dependencies/pom.xml @@ -63,13 +63,6 @@ <dependency> <groupId>org.apache.zeppelin</groupId> - <artifactId>spark-scala-2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.zeppelin</groupId> <artifactId>spark-scala-2.11</artifactId> <version>${project.version}</version> <scope>provided</scope> diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index df3ca6d..f984e15 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -224,51 +224,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } protected def createSparkContext(): Unit = { - if (isSparkSessionPresent()) { - spark2CreateContext() - } else { - spark1CreateContext() - } - } - - private def spark1CreateContext(): Unit = { - this.sc = SparkContext.getOrCreate(conf) - LOGGER.info("Created SparkContext") - getUserFiles().foreach(file => sc.addFile(file)) - - sc.getClass.getMethod("ui").invoke(sc).asInstanceOf[Option[_]] match { - case Some(webui) => - sparkUrl = webui.getClass.getMethod("appUIAddress").invoke(webui).asInstanceOf[String] - case None => - } - - initAndSendSparkWebUrl() - - val hiveSiteExisted: Boolean = - Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null - val hiveEnabled = conf.getBoolean("zeppelin.spark.useHiveContext", false) - if (hiveEnabled && hiveSiteExisted) { - sqlContext = Class.forName("org.apache.spark.sql.hive.HiveContext") - .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] - LOGGER.info("Created sql context (with Hive support)") - } else { - LOGGER.warn("spark.useHiveContext is set as true but no hive-site.xml" + - " is found in classpath, so zeppelin will fallback to SQLContext"); - sqlContext = Class.forName("org.apache.spark.sql.SQLContext") - .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext] - LOGGER.info("Created sql context (without Hive support)") - } - - bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) - bind("sqlContext", sqlContext.getClass.getCanonicalName, sqlContext, List("""@transient""")) - - scalaInterpret("import org.apache.spark.SparkContext._") - scalaInterpret("import sqlContext.implicits._") - scalaInterpret("import sqlContext.sql") - scalaInterpret("import org.apache.spark.sql.functions._") - // print empty string otherwise the last statement's output of this method - // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code - scalaInterpret("print(\"\")") + spark2CreateContext() } private def spark2CreateContext(): Unit = { diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java index adabbc1..3fff0f0 100644 --- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java +++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java @@ -63,9 +63,6 @@ public abstract class SparkShims { } else if (sparkMajorVersion == 2) { LOGGER.info("Initializing shims for Spark 2.x"); sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark2Shims"); - } else if (sparkMajorVersion == 1){ - LOGGER.info("Initializing shims for Spark 1.x"); - sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims"); } else { throw new Exception("Spark major version: '" + sparkMajorVersion + "' is not supported yet"); } diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java index 0ad1a35..ea18a32 100644 --- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -90,10 +90,6 @@ public class SparkVersion { return new SparkVersion(versionString); } - public boolean isSpark1() { - return this.olderThan(SPARK_2_0_0); - } - public boolean isSecretSocketSupported() { return this.newerThanEquals(SparkVersion.SPARK_2_4_0) || this.newerThanEqualsPatchVersion(SPARK_2_3_1) || diff --git a/spark/spark1-shims/pom.xml b/spark/spark1-shims/pom.xml deleted file mode 100644 index dc31079..0000000 --- a/spark/spark1-shims/pom.xml +++ /dev/null @@ -1,86 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <parent> - <artifactId>spark-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>spark1-shims</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Spark1 Shims</name> - - <properties> - <scala.binary.version>2.10</scala.binary.version> - <spark.version>1.6.3</spark.version> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>spark-shims</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>zeppelin-interpreter-shaded</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-interpreter-setting</id> - <phase>none</phase> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> diff --git a/spark/spark1-shims/src/main/java/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/java/org/apache/zeppelin/spark/Spark1Shims.java deleted file mode 100644 index 45c8618..0000000 --- a/spark/spark1-shims/src/main/java/org/apache/zeppelin/spark/Spark1Shims.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.zeppelin.spark; - -import org.apache.commons.lang.StringUtils; -import org.apache.spark.SparkContext; -import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.catalyst.expressions.GenericRow; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.ui.jobs.JobProgressListener; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.ResultMessages; -import org.apache.zeppelin.interpreter.SingleRowInterpreterResult; -import org.apache.zeppelin.tabledata.TableDataUtils; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -public class Spark1Shims extends SparkShims { - - private SparkContext sc; - - public Spark1Shims(Properties properties, Object entryPoint) { - super(properties); - this.sc = (SparkContext) entryPoint; - } - - public void setupSparkListener(final String master, - final String sparkWebUrl, - final InterpreterContext context) { - SparkContext sc = SparkContext.getOrCreate(); - sc.addSparkListener(new JobProgressListener(sc.getConf()) { - @Override - public void onJobStart(SparkListenerJobStart jobStart) { - if (sc.getConf().getBoolean("spark.ui.enabled", true) && - !Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", "false"))) { - buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context); - } - } - }); - } - - @Override - public String showDataFrame(Object obj, int maxResult, InterpreterContext context) { - if (obj instanceof DataFrame) { - DataFrame df = (DataFrame) obj; - String[] columns = df.columns(); - // DDL will empty DataFrame - if (columns.length == 0) { - return ""; - } - - // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult - List<Row> rows = df.takeAsList(maxResult + 1); - String template = context.getLocalProperties().get("template"); - if (!StringUtils.isBlank(template)) { - if (rows.size() >= 1) { - return new SingleRowInterpreterResult(sparkRowToList(rows.get(0)), template, context).toHtml(); - } else { - return ""; - } - } - - StringBuilder msg = new StringBuilder(); - msg.append("\n%table "); - msg.append(StringUtils.join(TableDataUtils.normalizeColumns(columns), "\t")); - msg.append("\n"); - boolean isLargerThanMaxResult = rows.size() > maxResult; - if (isLargerThanMaxResult) { - rows = rows.subList(0, maxResult); - } - for (Row row : rows) { - for (int i = 0; i < row.size(); ++i) { - msg.append(TableDataUtils.normalizeColumn(row.get(i))); - if (i != row.size() - 1) { - msg.append("\t"); - } - } - msg.append("\n"); - } - - if (isLargerThanMaxResult) { - msg.append("\n"); - msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult")); - } - // append %text at the end, otherwise the following output will be put in table as well. - msg.append("\n%text "); - return msg.toString(); - } else { - return obj.toString(); - } - } - - private List sparkRowToList(Row row) { - List list = new ArrayList(); - for (int i = 0; i< row.size(); i++) { - list.add(row.get(i)); - } - return list; - } - - @Override - public DataFrame getAsDataFrame(String value) { - String[] lines = value.split("\\n"); - String head = lines[0]; - String[] columns = head.split("\t"); - StructType schema = new StructType(); - for (String column : columns) { - schema = schema.add(column, "String"); - } - - List<Row> rows = new ArrayList<>(); - for (int i = 1; i < lines.length; ++i) { - String[] tokens = lines[i].split("\t"); - Row row = new GenericRow(tokens); - rows.add(row); - } - return SQLContext.getOrCreate(sc) - .createDataFrame(rows, schema); - } -}