This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 79f30d5fed [ZEPPELIN-6086] Remove Spark Shims and unofficial support for Spark 4.0 79f30d5fed is described below commit 79f30d5fed3abc2158268669563605b82604d472 Author: Adam Binford <adam...@gmail.com> AuthorDate: Tue Oct 1 09:04:39 2024 -0400 [ZEPPELIN-6086] Remove Spark Shims and unofficial support for Spark 4.0 ### What is this PR for? Add support for Spark 4 by removing the spark shims. There is only one version of shims now, and they are compatible with the current Spark 4.0 preview releases, so we can just remove the shim concept. ### What type of PR is it? Improvement ### Todos * [x] - Disable 4.0 tests in CI and re-enable supported version check ### What is the Jira issue? ZEPPELIN-6086 ### How should this be tested? A CI stage is added for Spark 4.0 tests, but these will need to be disabled until 4.0 is officially supported. ### Screenshots (if appropriate) ### Questions: * Does the license files need to update? No * Is there breaking changes for older versions? No * Does this needs documentation? Maybe Closes #4825 from Kimahriman/spark-4.0. Signed-off-by: Cheng Pan <cheng...@apache.org> --- .gitignore | 3 + spark/README.md | 6 +- spark/interpreter/pom.xml | 54 ++++++- .../org/apache/zeppelin/spark/SparkVersion.java | 0 .../src/main/resources/python/zeppelin_pyspark.py | 4 +- .../zeppelin/spark/SparkInterpreterTest.java | 1 - .../{SparkShimsTest.java => SparkUtilsTest.java} | 35 +---- spark/pom.xml | 3 +- .../zeppelin/spark/SparkScala212Interpreter.scala | 6 +- .../zeppelin/spark/SparkZeppelinContext.scala | 6 +- .../zeppelin/spark/SparkScala213Interpreter.scala | 6 +- .../zeppelin/spark/SparkZeppelinContext.scala | 6 +- spark/{spark3-shims => spark-common}/pom.xml | 21 ++- .../org/apache/zeppelin/spark/SparkUtils.java} | 101 ++++++++++++- spark/spark-shims/pom.xml | 63 -------- .../java/org/apache/zeppelin/spark/SparkShims.java | 166 --------------------- 16 files changed, 181 insertions(+), 300 deletions(-) diff --git a/.gitignore b/.gitignore index 61ae7eb6a6..4e7fe00b87 100644 --- a/.gitignore +++ b/.gitignore @@ -142,3 +142,6 @@ tramp # pyenv file .python-version + +# python venv +.venv diff --git a/spark/README.md b/spark/README.md index 76220accd8..4f8405ac53 100644 --- a/spark/README.md +++ b/spark/README.md @@ -16,9 +16,7 @@ Spark interpreter is the first and most important interpreter of Zeppelin. It su - Scala module for Scala 2.12 * scala-2.13 - Scala module for Scala 2.13 -* spark-shims - - Parent module for each Spark module -* spark3-shims - - Shims module for Spark3 +* spark-common + - Common utils for all Scala versions diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index 81e79dcf37..2aa3a5eb9b 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -59,12 +59,14 @@ <!-- settings --> <pyspark.test.exclude>**/PySparkInterpreterMatplotlibTest.java</pyspark.test.exclude> <pyspark.test.include>**/*Test.*</pyspark.test.include> + + <extraJavaTestArgs></extraJavaTestArgs> </properties> <dependencies> <dependency> <groupId>org.apache.zeppelin</groupId> - <artifactId>spark3-shims</artifactId> + <artifactId>spark-common</artifactId> <version>${project.version}</version> </dependency> @@ -162,6 +164,13 @@ <artifactId>spark-core_${spark.scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> + <exclusions> + <!-- Leads to conflicting Jackson versions in tests --> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> @@ -313,7 +322,7 @@ <target> <delete dir="../../interpreter/spark/pyspark" /> <copy file="${project.build.directory}/${spark.archive}/python/lib/py4j-${py4j.version}-src.zip" todir="${project.build.directory}/../../../interpreter/spark/pyspark" /> - <zip basedir="${project.build.directory}/${spark.archive}/python" destfile="${project.build.directory}/../../../interpreter/spark/pyspark/pyspark.zip" includes="pyspark/*.py,pyspark/**/*.py" /> + <zip basedir="${project.build.directory}/${spark.archive}/python" destfile="${project.build.directory}/../../../interpreter/spark/pyspark/pyspark.zip" includes="pyspark/" /> </target> </configuration> </execution> @@ -359,7 +368,7 @@ <configuration> <forkCount>1</forkCount> <reuseForks>false</reuseForks> - <argLine>-Xmx3072m -XX:MaxMetaspaceSize=256m</argLine> + <argLine>-Xmx3072m -XX:MaxMetaspaceSize=256m ${extraJavaTestArgs}</argLine> <excludes> <exclude>${pyspark.test.exclude}</exclude> <exclude>${tests.to.exclude}</exclude> @@ -513,6 +522,35 @@ <profiles> + <profile> + <id>java-17</id> + <activation> + <jdk>[17,)</jdk> + </activation> + <properties> + <extraJavaTestArgs> + -XX:+IgnoreUnrecognizedVMOptions + --add-modules=jdk.incubator.vector + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.lang.invoke=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED + --add-opens=java.base/java.io=ALL-UNNAMED + --add-opens=java.base/java.net=ALL-UNNAMED + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent=ALL-UNNAMED + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/sun.nio.cs=ALL-UNNAMED + --add-opens=java.base/sun.security.action=ALL-UNNAMED + --add-opens=java.base/sun.util.calendar=ALL-UNNAMED + -Djdk.reflect.useDirectMethodHandle=false + -Dio.netty.tryReflectionSetAccessible=true + </extraJavaTestArgs> + </properties> + </profile> + <!-- profile spark-scala-x only affect the unit test in spark/interpreter module --> <profile> @@ -534,6 +572,16 @@ </properties> </profile> + <profile> + <id>spark-4.0</id> + <properties> + <spark.version>4.0.0-preview2</spark.version> + <protobuf.version>3.21.12</protobuf.version> + <py4j.version>0.10.9.7</py4j.version> + <libthrift.version>0.16.0</libthrift.version> + </properties> + </profile> + <!-- profile spark-x only affect spark version used in test --> <profile> <id>spark-3.5</id> diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java similarity index 100% rename from spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java rename to spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py index a77c383886..52788ccdb3 100644 --- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py @@ -19,10 +19,10 @@ import warnings from py4j.java_gateway import java_import from pyspark.conf import SparkConf -from pyspark.context import SparkContext +from pyspark import SparkContext # for back compatibility -from pyspark.sql import SQLContext, Row +from pyspark.sql import SQLContext intp = gateway.entry_point diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index efe8782946..a5d4156cfc 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -592,7 +592,6 @@ class SparkInterpreterTest { if (this.interpreter != null) { this.interpreter.close(); } - SparkShims.reset(); } private InterpreterContext getInterpreterContext() { diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkUtilsTest.java similarity index 78% rename from spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java rename to spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkUtilsTest.java index 5bd9cbba69..a006f1bc48 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkUtilsTest.java @@ -37,7 +37,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; -class SparkShimsTest { +class SparkUtilsTest { @ParameterizedTest @CsvSource({"2.6.0, false", @@ -64,29 +64,12 @@ class SparkShimsTest { "3.0.0-alpha4, true", // The latest fixed version "3.0.1, true"}) // Future versions void checkYarnVersionTest(String version, boolean expected) { - SparkShims sparkShims = - new SparkShims(new Properties()) { - @Override - public void setupSparkListener(String master, - String sparkWebUrl, - InterpreterContext context) {} - - @Override - public String showDataFrame(Object obj, int maxResult, InterpreterContext context) { - return null; - } - - @Override - public Object getAsDataFrame(String value) { - return null; - } - }; - assertEquals(expected, sparkShims.supportYarn6615(version)); + assertEquals(expected, SparkUtils.supportYarn6615(version)); } @Nested class SingleTests { - SparkShims sparkShims; + SparkUtils sparkUtils; InterpreterContext mockContext; RemoteInterpreterEventClient mockIntpEventClient; @@ -96,18 +79,14 @@ class SparkShimsTest { mockIntpEventClient = mock(RemoteInterpreterEventClient.class); when(mockContext.getIntpEventClient()).thenReturn(mockIntpEventClient); - try { - sparkShims = SparkShims.getInstance(SparkVersion.SPARK_3_3_0.toString(), new Properties(), null); - } catch (Throwable e1) { - throw new RuntimeException("All SparkShims are tried, but no one can be created."); - } + sparkUtils = new SparkUtils(new Properties(), null); } @Test void runUnderLocalTest() { Properties properties = new Properties(); properties.setProperty("spark.jobGroup.id", "zeppelin|user1|noteId|paragraphId"); - sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, properties, mockContext); + sparkUtils.buildSparkJobUrl("local", "http://sparkurl", 0, properties, mockContext); @SuppressWarnings("unchecked") ArgumentCaptor<Map<String, String>> argument = ArgumentCaptor.forClass(HashMap.class); verify(mockIntpEventClient).onParaInfosReceived(argument.capture()); @@ -120,14 +99,14 @@ class SparkShimsTest { void runUnderYarnTest() { Properties properties = new Properties(); properties.setProperty("spark.jobGroup.id", "zeppelin|user1|noteId|paragraphId"); - sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, properties, mockContext); + sparkUtils.buildSparkJobUrl("yarn", "http://sparkurl", 0, properties, mockContext); @SuppressWarnings("unchecked") ArgumentCaptor<Map<String, String>> argument = ArgumentCaptor.forClass(HashMap.class); verify(mockIntpEventClient).onParaInfosReceived(argument.capture()); Map<String, String> mapValue = argument.getValue(); assertTrue(mapValue.keySet().contains("jobUrl")); - if (sparkShims.supportYarn6615(VersionInfo.getVersion())) { + if (sparkUtils.supportYarn6615(VersionInfo.getVersion())) { assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id=")); } else { assertFalse(mapValue.get("jobUrl").contains("/jobs/job?id=")); diff --git a/spark/pom.xml b/spark/pom.xml index f3eb50d00b..f009741b43 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -55,8 +55,7 @@ <module>spark-scala-parent</module> <module>scala-2.12</module> <module>scala-2.13</module> - <module>spark-shims</module> - <module>spark3-shims</module> + <module>spark-common</module> </modules> <build> diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index 4e9cece8e8..efd9a0eb16 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -198,9 +198,9 @@ class SparkScala212Interpreter(conf: SparkConf, } override def createZeppelinContext(): Unit = { - val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession) - sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) - z = new SparkZeppelinContext(sc, sparkShims, + val sparkUtils = new SparkUtils(properties, sparkSession) + sparkUtils.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) + z = new SparkZeppelinContext(sc, sparkUtils, interpreterGroup.getInterpreterHookRegistry, properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala index ff6186e8a4..0d62dad0f9 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ * ZeppelinContext for Spark */ class SparkZeppelinContext(val sc: SparkContext, - val sparkShims: SparkShims, + val sparkUtils: SparkUtils, val hooks2: InterpreterHookRegistry, val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) { @@ -65,7 +65,7 @@ class SparkZeppelinContext(val sc: SparkContext, override def getInterpreterClassMap: util.Map[String, String] = interpreterClassMap.asJava - override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult, interpreterContext) + override def showData(obj: Any, maxResult: Int): String = sparkUtils.showDataFrame(obj, maxResult, interpreterContext) /** * create paragraph level of dynamic form of Select with no item selected. @@ -237,6 +237,6 @@ class SparkZeppelinContext(val sc: SparkContext, } def getAsDataFrame(name: String): Object = { - sparkShims.getAsDataFrame(get(name).toString) + sparkUtils.getAsDataFrame(get(name).toString) } } diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala index 659e5788f5..8f2c1beb18 100644 --- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala @@ -200,9 +200,9 @@ class SparkScala213Interpreter(conf: SparkConf, } override def createZeppelinContext(): Unit = { - val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession) - sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) - z = new SparkZeppelinContext(sc, sparkShims, + val sparkUtils = new SparkUtils(properties, sparkSession) + sparkUtils.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) + z = new SparkZeppelinContext(sc, sparkUtils, interpreterGroup.getInterpreterHookRegistry, properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala index ff6186e8a4..0d62dad0f9 100644 --- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ * ZeppelinContext for Spark */ class SparkZeppelinContext(val sc: SparkContext, - val sparkShims: SparkShims, + val sparkUtils: SparkUtils, val hooks2: InterpreterHookRegistry, val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) { @@ -65,7 +65,7 @@ class SparkZeppelinContext(val sc: SparkContext, override def getInterpreterClassMap: util.Map[String, String] = interpreterClassMap.asJava - override def showData(obj: Any, maxResult: Int): String = sparkShims.showDataFrame(obj, maxResult, interpreterContext) + override def showData(obj: Any, maxResult: Int): String = sparkUtils.showDataFrame(obj, maxResult, interpreterContext) /** * create paragraph level of dynamic form of Select with no item selected. @@ -237,6 +237,6 @@ class SparkZeppelinContext(val sc: SparkContext, } def getAsDataFrame(name: String): Object = { - sparkShims.getAsDataFrame(get(name).toString) + sparkUtils.getAsDataFrame(get(name).toString) } } diff --git a/spark/spark3-shims/pom.xml b/spark/spark-common/pom.xml similarity index 85% rename from spark/spark3-shims/pom.xml rename to spark/spark-common/pom.xml index 255a2d4bb1..ee859b9513 100644 --- a/spark/spark3-shims/pom.xml +++ b/spark/spark-common/pom.xml @@ -18,6 +18,7 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> <artifactId>spark-parent</artifactId> <groupId>org.apache.zeppelin</groupId> @@ -26,21 +27,19 @@ </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>spark3-shims</artifactId> + <artifactId>spark-common</artifactId> <packaging>jar</packaging> - <name>Zeppelin: Spark3 Shims</name> - - <properties> - <scala.binary.version>2.12</scala.binary.version> - <spark.version>3.4.1</spark.version> - </properties> + <name>Zeppelin: Spark Common</name> <dependencies> - + <!-- + This is for ZEPPELIN-2221 using VersionInfo for check the version of Yarn. + It's checked that VersionInfo is compatible at least 2.2.0 to the latest one. + --> <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>spark-shims</artifactId> - <version>${project.version}</version> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-api</artifactId> + <scope>provided</scope> </dependency> <dependency> diff --git a/spark/spark3-shims/src/main/java/org/apache/zeppelin/spark/Spark3Shims.java b/spark/spark-common/src/main/java/org/apache/zeppelin/spark/SparkUtils.java similarity index 54% rename from spark/spark3-shims/src/main/java/org/apache/zeppelin/spark/Spark3Shims.java rename to spark/spark-common/src/main/java/org/apache/zeppelin/spark/SparkUtils.java index 094fca62c1..00948b198c 100644 --- a/spark/spark3-shims/src/main/java/org/apache/zeppelin/spark/Spark3Shims.java +++ b/spark/spark-common/src/main/java/org/apache/zeppelin/spark/SparkUtils.java @@ -15,9 +15,11 @@ * limitations under the License. */ - package org.apache.zeppelin.spark; + +import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.util.VersionUtil; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkContext; import org.apache.spark.scheduler.SparkListener; @@ -31,18 +33,41 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.ResultMessages; import org.apache.zeppelin.interpreter.SingleRowInterpreterResult; import org.apache.zeppelin.tabledata.TableDataUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.lang.reflect.Constructor; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; -public class Spark3Shims extends SparkShims { +/** + * This is abstract class for anything that is api incompatible between spark1 and spark2. It will + * load the correct version of SparkUtils based on the version of Spark. + */ +public class SparkUtils { + + // the following lines for checking specific versions + private static final String HADOOP_VERSION_2_6_6 = "2.6.6"; + private static final String HADOOP_VERSION_2_7_0 = "2.7.0"; + private static final String HADOOP_VERSION_2_7_4 = "2.7.4"; + private static final String HADOOP_VERSION_2_8_0 = "2.8.0"; + private static final String HADOOP_VERSION_2_8_2 = "2.8.2"; + private static final String HADOOP_VERSION_2_9_0 = "2.9.0"; + private static final String HADOOP_VERSION_3_0_0 = "3.0.0"; + private static final String HADOOP_VERSION_3_0_0_ALPHA4 = "3.0.0-alpha4"; + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkUtils.class); + + protected SparkSession sparkSession; - private SparkSession sparkSession; + protected Properties properties; - public Spark3Shims(Properties properties, Object entryPoint) { - super(properties); - this.sparkSession = (SparkSession) entryPoint; + public SparkUtils(Properties properties, SparkSession sparkSession) { + this.properties = properties; + this.sparkSession = sparkSession; } public void setupSparkListener(final String master, @@ -61,7 +86,6 @@ public class Spark3Shims extends SparkShims { }); } - @Override public String showDataFrame(Object obj, int maxResult, InterpreterContext context) { if (obj instanceof Dataset) { Dataset<Row> df = ((Dataset) obj).toDF(); @@ -119,7 +143,6 @@ public class Spark3Shims extends SparkShims { return list; } - @Override public Dataset<Row> getAsDataFrame(String value) { String[] lines = value.split("\\n"); String head = lines[0]; @@ -137,4 +160,66 @@ public class Spark3Shims extends SparkShims { } return sparkSession.createDataFrame(rows, schema); } + + protected void buildSparkJobUrl(String master, + String sparkWebUrl, + int jobId, + Properties jobProperties, + InterpreterContext context) { + String jobUrl = null; + if (sparkWebUrl.contains("{jobId}")) { + jobUrl = sparkWebUrl.replace("{jobId}", jobId + ""); + } else { + jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId; + String version = VersionInfo.getVersion(); + if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) { + jobUrl = sparkWebUrl + "/jobs"; + } + } + + String jobGroupId = jobProperties.getProperty("spark.jobGroup.id"); + + Map<String, String> infos = new HashMap<>(); + infos.put("jobUrl", jobUrl); + infos.put("label", "SPARK JOB"); + infos.put("tooltip", "View in Spark web UI"); + infos.put("noteId", getNoteId(jobGroupId)); + infos.put("paraId", getParagraphId(jobGroupId)); + LOGGER.debug("Send spark job url: {}", infos); + context.getIntpEventClient().onParaInfosReceived(infos); + } + + public static String getNoteId(String jobGroupId) { + String[] tokens = jobGroupId.split("\\|"); + if (tokens.length != 4) { + throw new RuntimeException("Invalid jobGroupId: " + jobGroupId); + } + return tokens[2]; + } + + public static String getParagraphId(String jobGroupId) { + String[] tokens = jobGroupId.split("\\|"); + if (tokens.length != 4) { + throw new RuntimeException("Invalid jobGroupId: " + jobGroupId); + } + return tokens[3]; + } + + /** + * This is temporal patch for support old versions of Yarn which is not adopted YARN-6615 + * + * @return true if YARN-6615 is patched, false otherwise + */ + protected static boolean supportYarn6615(String version) { + return (VersionUtil.compareVersions(HADOOP_VERSION_2_6_6, version) <= 0 + && VersionUtil.compareVersions(HADOOP_VERSION_2_7_0, version) > 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_2_7_4, version) <= 0 + && VersionUtil.compareVersions(HADOOP_VERSION_2_8_0, version) > 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_2_8_2, version) <= 0 + && VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) > 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) <= 0 + && VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) > 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0_ALPHA4, version) <= 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) <= 0); + } } diff --git a/spark/spark-shims/pom.xml b/spark/spark-shims/pom.xml deleted file mode 100644 index 80112b7801..0000000000 --- a/spark/spark-shims/pom.xml +++ /dev/null @@ -1,63 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <parent> - <artifactId>spark-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.12.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>spark-shims</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Spark Shims</name> - - <dependencies> - <!-- - This is for ZEPPELIN-2221 using VersionInfo for check the version of Yarn. - It's checked that VersionInfo is compatible at least 2.2.0 to the latest one. - --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client-api</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-interpreter-setting</id> - <phase>none</phase> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java deleted file mode 100644 index 542c3de8bb..0000000000 --- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - - -import org.apache.hadoop.util.VersionInfo; -import org.apache.hadoop.util.VersionUtil; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -/** - * This is abstract class for anything that is api incompatible between spark1 and spark2. It will - * load the correct version of SparkShims based on the version of Spark. - */ -public abstract class SparkShims { - - // the following lines for checking specific versions - private static final String HADOOP_VERSION_2_6_6 = "2.6.6"; - private static final String HADOOP_VERSION_2_7_0 = "2.7.0"; - private static final String HADOOP_VERSION_2_7_4 = "2.7.4"; - private static final String HADOOP_VERSION_2_8_0 = "2.8.0"; - private static final String HADOOP_VERSION_2_8_2 = "2.8.2"; - private static final String HADOOP_VERSION_2_9_0 = "2.9.0"; - private static final String HADOOP_VERSION_3_0_0 = "3.0.0"; - private static final String HADOOP_VERSION_3_0_0_ALPHA4 = "3.0.0-alpha4"; - - private static final Logger LOGGER = LoggerFactory.getLogger(SparkShims.class); - - private static SparkShims sparkShims; - - protected Properties properties; - - public SparkShims(Properties properties) { - this.properties = properties; - } - - private static SparkShims loadShims(int sparkMajorVersion, Properties properties, Object entryPoint) - throws Exception { - Class<?> sparkShimsClass; - if (sparkMajorVersion == 3) { - LOGGER.info("Initializing shims for Spark 3.x"); - sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark3Shims"); - } else { - throw new Exception("Spark major version: '" + sparkMajorVersion + "' is not supported yet"); - } - - Constructor c = sparkShimsClass.getConstructor(Properties.class, Object.class); - return (SparkShims) c.newInstance(properties, entryPoint); - } - - /** - * - * @param sparkVersion - * @param properties - * @param entryPoint entryPoint is SparkContext for Spark 1.x SparkSession for Spark 2.x - * @return - */ - public static SparkShims getInstance(String sparkVersion, - Properties properties, - Object entryPoint) throws Exception { - if (sparkShims == null) { - int sparkMajorVersion = SparkVersion.fromVersionString(sparkVersion).getMajorVersion(); - sparkShims = loadShims(sparkMajorVersion, properties, entryPoint); - } - return sparkShims; - } - - /** - * This is due to SparkListener api change between spark1 and spark2. SparkListener is trait in - * spark1 while it is abstract class in spark2. - */ - public abstract void setupSparkListener(String master, - String sparkWebUrl, - InterpreterContext context); - - public abstract String showDataFrame(Object obj, int maxResult, InterpreterContext context); - - public abstract Object getAsDataFrame(String value); - - protected void buildSparkJobUrl(String master, - String sparkWebUrl, - int jobId, - Properties jobProperties, - InterpreterContext context) { - String jobUrl = null; - if (sparkWebUrl.contains("{jobId}")) { - jobUrl = sparkWebUrl.replace("{jobId}", jobId + ""); - } else { - jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId; - String version = VersionInfo.getVersion(); - if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) { - jobUrl = sparkWebUrl + "/jobs"; - } - } - - String jobGroupId = jobProperties.getProperty("spark.jobGroup.id"); - - Map<String, String> infos = new HashMap<>(); - infos.put("jobUrl", jobUrl); - infos.put("label", "SPARK JOB"); - infos.put("tooltip", "View in Spark web UI"); - infos.put("noteId", getNoteId(jobGroupId)); - infos.put("paraId", getParagraphId(jobGroupId)); - LOGGER.debug("Send spark job url: {}", infos); - context.getIntpEventClient().onParaInfosReceived(infos); - } - - public static String getNoteId(String jobGroupId) { - String[] tokens = jobGroupId.split("\\|"); - if (tokens.length != 4) { - throw new RuntimeException("Invalid jobGroupId: " + jobGroupId); - } - return tokens[2]; - } - - public static String getParagraphId(String jobGroupId) { - String[] tokens = jobGroupId.split("\\|"); - if (tokens.length != 4) { - throw new RuntimeException("Invalid jobGroupId: " + jobGroupId); - } - return tokens[3]; - } - - /** - * This is temporal patch for support old versions of Yarn which is not adopted YARN-6615 - * - * @return true if YARN-6615 is patched, false otherwise - */ - protected boolean supportYarn6615(String version) { - return (VersionUtil.compareVersions(HADOOP_VERSION_2_6_6, version) <= 0 - && VersionUtil.compareVersions(HADOOP_VERSION_2_7_0, version) > 0) - || (VersionUtil.compareVersions(HADOOP_VERSION_2_7_4, version) <= 0 - && VersionUtil.compareVersions(HADOOP_VERSION_2_8_0, version) > 0) - || (VersionUtil.compareVersions(HADOOP_VERSION_2_8_2, version) <= 0 - && VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) > 0) - || (VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) <= 0 - && VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) > 0) - || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0_ALPHA4, version) <= 0) - || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) <= 0); - } - - public static void reset() { - sparkShims = null; - } -}