This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 79f30d5fed [ZEPPELIN-6086] Remove Spark Shims and unofficial support 
for Spark 4.0
79f30d5fed is described below

commit 79f30d5fed3abc2158268669563605b82604d472
Author: Adam Binford <adam...@gmail.com>
AuthorDate: Tue Oct 1 09:04:39 2024 -0400

    [ZEPPELIN-6086] Remove Spark Shims and unofficial support for Spark 4.0
    
    ### What is this PR for?
    
    Add support for Spark 4 by removing the spark shims. There is only one 
version of shims now, and they are compatible with the current Spark 4.0 
preview releases, so we can just remove the shim concept.
    
    ### What type of PR is it?
    Improvement
    
    ### Todos
    * [x] - Disable 4.0 tests in CI and re-enable supported version check
    
    ### What is the Jira issue?
    
    ZEPPELIN-6086
    
    ### How should this be tested?
    A CI stage is added for Spark 4.0 tests, but these will need to be disabled 
until 4.0 is officially supported.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the license files need to update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? Maybe
    
    
    Closes #4825 from Kimahriman/spark-4.0.
    
    Signed-off-by: Cheng Pan <cheng...@apache.org>
---
 .gitignore                                         |   3 +
 spark/README.md                                    |   6 +-
 spark/interpreter/pom.xml                          |  54 ++++++-
 .../org/apache/zeppelin/spark/SparkVersion.java    |   0
 .../src/main/resources/python/zeppelin_pyspark.py  |   4 +-
 .../zeppelin/spark/SparkInterpreterTest.java       |   1 -
 .../{SparkShimsTest.java => SparkUtilsTest.java}   |  35 +----
 spark/pom.xml                                      |   3 +-
 .../zeppelin/spark/SparkScala212Interpreter.scala  |   6 +-
 .../zeppelin/spark/SparkZeppelinContext.scala      |   6 +-
 .../zeppelin/spark/SparkScala213Interpreter.scala  |   6 +-
 .../zeppelin/spark/SparkZeppelinContext.scala      |   6 +-
 spark/{spark3-shims => spark-common}/pom.xml       |  21 ++-
 .../org/apache/zeppelin/spark/SparkUtils.java}     | 101 ++++++++++++-
 spark/spark-shims/pom.xml                          |  63 --------
 .../java/org/apache/zeppelin/spark/SparkShims.java | 166 ---------------------
 16 files changed, 181 insertions(+), 300 deletions(-)

diff --git a/.gitignore b/.gitignore
index 61ae7eb6a6..4e7fe00b87 100644
--- a/.gitignore
+++ b/.gitignore
@@ -142,3 +142,6 @@ tramp
 
 # pyenv file
 .python-version
+
+# python venv
+.venv
diff --git a/spark/README.md b/spark/README.md
index 76220accd8..4f8405ac53 100644
--- a/spark/README.md
+++ b/spark/README.md
@@ -16,9 +16,7 @@ Spark interpreter is the first and most important interpreter 
of Zeppelin. It su
   - Scala module for Scala 2.12
 * scala-2.13
   - Scala module for Scala 2.13
-* spark-shims 
-  - Parent module for each Spark module
-* spark3-shims
-  - Shims module for Spark3
+* spark-common
+  - Common utils for all Scala versions
 
 
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 81e79dcf37..2aa3a5eb9b 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -59,12 +59,14 @@
     <!-- settings -->
     
<pyspark.test.exclude>**/PySparkInterpreterMatplotlibTest.java</pyspark.test.exclude>
     <pyspark.test.include>**/*Test.*</pyspark.test.include>
+
+    <extraJavaTestArgs></extraJavaTestArgs>
   </properties>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
-      <artifactId>spark3-shims</artifactId>
+      <artifactId>spark-common</artifactId>
       <version>${project.version}</version>
     </dependency>
 
@@ -162,6 +164,13 @@
       <artifactId>spark-core_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
+      <exclusions>
+        <!-- Leads to conflicting Jackson versions in tests -->
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -313,7 +322,7 @@
               <target>
                 <delete dir="../../interpreter/spark/pyspark" />
                 <copy 
file="${project.build.directory}/${spark.archive}/python/lib/py4j-${py4j.version}-src.zip"
 todir="${project.build.directory}/../../../interpreter/spark/pyspark" />
-                <zip 
basedir="${project.build.directory}/${spark.archive}/python" 
destfile="${project.build.directory}/../../../interpreter/spark/pyspark/pyspark.zip"
 includes="pyspark/*.py,pyspark/**/*.py" />
+                <zip 
basedir="${project.build.directory}/${spark.archive}/python" 
destfile="${project.build.directory}/../../../interpreter/spark/pyspark/pyspark.zip"
 includes="pyspark/" />
               </target>
             </configuration>
           </execution>
@@ -359,7 +368,7 @@
         <configuration>
           <forkCount>1</forkCount>
           <reuseForks>false</reuseForks>
-          <argLine>-Xmx3072m -XX:MaxMetaspaceSize=256m</argLine>
+          <argLine>-Xmx3072m -XX:MaxMetaspaceSize=256m 
${extraJavaTestArgs}</argLine>
           <excludes>
             <exclude>${pyspark.test.exclude}</exclude>
             <exclude>${tests.to.exclude}</exclude>
@@ -513,6 +522,35 @@
 
   <profiles>
 
+    <profile>
+      <id>java-17</id>
+      <activation>
+        <jdk>[17,)</jdk>
+      </activation>
+      <properties>
+        <extraJavaTestArgs>
+          -XX:+IgnoreUnrecognizedVMOptions
+          --add-modules=jdk.incubator.vector
+          --add-opens=java.base/java.lang=ALL-UNNAMED
+          --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+          --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+          --add-opens=java.base/java.io=ALL-UNNAMED
+          --add-opens=java.base/java.net=ALL-UNNAMED
+          --add-opens=java.base/java.nio=ALL-UNNAMED
+          --add-opens=java.base/java.util=ALL-UNNAMED
+          --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+          --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
+          --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED
+          --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+          --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
+          --add-opens=java.base/sun.security.action=ALL-UNNAMED
+          --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
+          -Djdk.reflect.useDirectMethodHandle=false
+          -Dio.netty.tryReflectionSetAccessible=true
+        </extraJavaTestArgs>
+      </properties>
+    </profile>
+
     <!-- profile spark-scala-x only affect the unit test in spark/interpreter 
module -->
 
     <profile>
@@ -534,6 +572,16 @@
       </properties>
     </profile>
 
+    <profile>
+      <id>spark-4.0</id>
+      <properties>
+        <spark.version>4.0.0-preview2</spark.version>
+        <protobuf.version>3.21.12</protobuf.version>
+        <py4j.version>0.10.9.7</py4j.version>
+        <libthrift.version>0.16.0</libthrift.version>
+      </properties>
+    </profile>
+
     <!-- profile spark-x only affect spark version used in test -->
     <profile>
       <id>spark-3.5</id>
diff --git 
a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
similarity index 100%
rename from 
spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
rename to 
spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py 
b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
index a77c383886..52788ccdb3 100644
--- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
@@ -19,10 +19,10 @@ import warnings
 
 from py4j.java_gateway import java_import
 from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
+from pyspark import SparkContext
 
 # for back compatibility
-from pyspark.sql import SQLContext, Row
+from pyspark.sql import SQLContext
 
 intp = gateway.entry_point
 
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index efe8782946..a5d4156cfc 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -592,7 +592,6 @@ class SparkInterpreterTest {
     if (this.interpreter != null) {
       this.interpreter.close();
     }
-    SparkShims.reset();
   }
 
   private InterpreterContext getInterpreterContext() {
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkUtilsTest.java
similarity index 78%
rename from 
spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
rename to 
spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkUtilsTest.java
index 5bd9cbba69..a006f1bc48 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkUtilsTest.java
@@ -37,7 +37,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.ArgumentCaptor;
 
-class SparkShimsTest {
+class SparkUtilsTest {
 
   @ParameterizedTest
   @CsvSource({"2.6.0, false",
@@ -64,29 +64,12 @@ class SparkShimsTest {
   "3.0.0-alpha4, true", // The latest fixed version
   "3.0.1, true"}) // Future versions
   void checkYarnVersionTest(String version, boolean expected) {
-      SparkShims sparkShims =
-          new SparkShims(new Properties()) {
-            @Override
-            public void setupSparkListener(String master,
-                                           String sparkWebUrl,
-                                           InterpreterContext context) {}
-
-            @Override
-            public String showDataFrame(Object obj, int maxResult, 
InterpreterContext context) {
-              return null;
-            }
-
-            @Override
-            public Object getAsDataFrame(String value) {
-              return null;
-            }
-          };
-      assertEquals(expected, sparkShims.supportYarn6615(version));
+      assertEquals(expected, SparkUtils.supportYarn6615(version));
   }
 
   @Nested
   class SingleTests {
-    SparkShims sparkShims;
+    SparkUtils sparkUtils;
     InterpreterContext mockContext;
     RemoteInterpreterEventClient mockIntpEventClient;
 
@@ -96,18 +79,14 @@ class SparkShimsTest {
       mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
       when(mockContext.getIntpEventClient()).thenReturn(mockIntpEventClient);
 
-      try {
-        sparkShims = 
SparkShims.getInstance(SparkVersion.SPARK_3_3_0.toString(), new Properties(), 
null);
-      } catch (Throwable e1) {
-        throw new RuntimeException("All SparkShims are tried, but no one can 
be created.");
-      }
+      sparkUtils = new SparkUtils(new Properties(), null);
     }
 
     @Test
     void runUnderLocalTest() {
       Properties properties = new Properties();
       properties.setProperty("spark.jobGroup.id", 
"zeppelin|user1|noteId|paragraphId");
-      sparkShims.buildSparkJobUrl("local", "http://sparkurl";, 0, properties, 
mockContext);
+      sparkUtils.buildSparkJobUrl("local", "http://sparkurl";, 0, properties, 
mockContext);
       @SuppressWarnings("unchecked")
       ArgumentCaptor<Map<String, String>> argument = 
ArgumentCaptor.forClass(HashMap.class);
       verify(mockIntpEventClient).onParaInfosReceived(argument.capture());
@@ -120,14 +99,14 @@ class SparkShimsTest {
     void runUnderYarnTest() {
       Properties properties = new Properties();
       properties.setProperty("spark.jobGroup.id", 
"zeppelin|user1|noteId|paragraphId");
-      sparkShims.buildSparkJobUrl("yarn", "http://sparkurl";, 0, properties, 
mockContext);
+      sparkUtils.buildSparkJobUrl("yarn", "http://sparkurl";, 0, properties, 
mockContext);
       @SuppressWarnings("unchecked")
       ArgumentCaptor<Map<String, String>> argument = 
ArgumentCaptor.forClass(HashMap.class);
       verify(mockIntpEventClient).onParaInfosReceived(argument.capture());
       Map<String, String> mapValue = argument.getValue();
       assertTrue(mapValue.keySet().contains("jobUrl"));
 
-      if (sparkShims.supportYarn6615(VersionInfo.getVersion())) {
+      if (sparkUtils.supportYarn6615(VersionInfo.getVersion())) {
         assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id="));
       } else {
         assertFalse(mapValue.get("jobUrl").contains("/jobs/job?id="));
diff --git a/spark/pom.xml b/spark/pom.xml
index f3eb50d00b..f009741b43 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -55,8 +55,7 @@
         <module>spark-scala-parent</module>
         <module>scala-2.12</module>
         <module>scala-2.13</module>
-        <module>spark-shims</module>
-        <module>spark3-shims</module>
+        <module>spark-common</module>
     </modules>
 
     <build>
diff --git 
a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
 
b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index 4e9cece8e8..efd9a0eb16 100644
--- 
a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ 
b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -198,9 +198,9 @@ class SparkScala212Interpreter(conf: SparkConf,
   }
 
   override def createZeppelinContext(): Unit = {
-    val sparkShims = SparkShims.getInstance(sc.version, properties, 
sparkSession)
-    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
-    z = new SparkZeppelinContext(sc, sparkShims,
+    val sparkUtils = new SparkUtils(properties, sparkSession)
+    sparkUtils.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+    z = new SparkZeppelinContext(sc, sparkUtils,
       interpreterGroup.getInterpreterHookRegistry,
       properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
     bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
diff --git 
a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
 
b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index ff6186e8a4..0d62dad0f9 100644
--- 
a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ 
b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
   * ZeppelinContext for Spark
   */
 class SparkZeppelinContext(val sc: SparkContext,
-                           val sparkShims: SparkShims,
+                           val sparkUtils: SparkUtils,
                            val hooks2: InterpreterHookRegistry,
                            val maxResult2: Int) extends 
ZeppelinContext(hooks2, maxResult2) {
 
@@ -65,7 +65,7 @@ class SparkZeppelinContext(val sc: SparkContext,
 
   override def getInterpreterClassMap: util.Map[String, String] = 
interpreterClassMap.asJava
 
-  override def showData(obj: Any, maxResult: Int): String = 
sparkShims.showDataFrame(obj, maxResult, interpreterContext)
+  override def showData(obj: Any, maxResult: Int): String = 
sparkUtils.showDataFrame(obj, maxResult, interpreterContext)
 
   /**
    * create paragraph level of dynamic form of Select with no item selected.
@@ -237,6 +237,6 @@ class SparkZeppelinContext(val sc: SparkContext,
   }
 
   def getAsDataFrame(name: String): Object = {
-    sparkShims.getAsDataFrame(get(name).toString)
+    sparkUtils.getAsDataFrame(get(name).toString)
   }
 }
diff --git 
a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
 
b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
index 659e5788f5..8f2c1beb18 100644
--- 
a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
+++ 
b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
@@ -200,9 +200,9 @@ class SparkScala213Interpreter(conf: SparkConf,
   }
 
   override def createZeppelinContext(): Unit = {
-    val sparkShims = SparkShims.getInstance(sc.version, properties, 
sparkSession)
-    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
-    z = new SparkZeppelinContext(sc, sparkShims,
+    val sparkUtils = new SparkUtils(properties, sparkSession)
+    sparkUtils.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+    z = new SparkZeppelinContext(sc, sparkUtils,
       interpreterGroup.getInterpreterHookRegistry,
       properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
     bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
diff --git 
a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
 
b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index ff6186e8a4..0d62dad0f9 100644
--- 
a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ 
b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
   * ZeppelinContext for Spark
   */
 class SparkZeppelinContext(val sc: SparkContext,
-                           val sparkShims: SparkShims,
+                           val sparkUtils: SparkUtils,
                            val hooks2: InterpreterHookRegistry,
                            val maxResult2: Int) extends 
ZeppelinContext(hooks2, maxResult2) {
 
@@ -65,7 +65,7 @@ class SparkZeppelinContext(val sc: SparkContext,
 
   override def getInterpreterClassMap: util.Map[String, String] = 
interpreterClassMap.asJava
 
-  override def showData(obj: Any, maxResult: Int): String = 
sparkShims.showDataFrame(obj, maxResult, interpreterContext)
+  override def showData(obj: Any, maxResult: Int): String = 
sparkUtils.showDataFrame(obj, maxResult, interpreterContext)
 
   /**
    * create paragraph level of dynamic form of Select with no item selected.
@@ -237,6 +237,6 @@ class SparkZeppelinContext(val sc: SparkContext,
   }
 
   def getAsDataFrame(name: String): Object = {
-    sparkShims.getAsDataFrame(get(name).toString)
+    sparkUtils.getAsDataFrame(get(name).toString)
   }
 }
diff --git a/spark/spark3-shims/pom.xml b/spark/spark-common/pom.xml
similarity index 85%
rename from spark/spark3-shims/pom.xml
rename to spark/spark-common/pom.xml
index 255a2d4bb1..ee859b9513 100644
--- a/spark/spark3-shims/pom.xml
+++ b/spark/spark-common/pom.xml
@@ -18,6 +18,7 @@
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
   <parent>
     <artifactId>spark-parent</artifactId>
     <groupId>org.apache.zeppelin</groupId>
@@ -26,21 +27,19 @@
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
-  <artifactId>spark3-shims</artifactId>
+  <artifactId>spark-common</artifactId>
   <packaging>jar</packaging>
-  <name>Zeppelin: Spark3 Shims</name>
-
-  <properties>
-    <scala.binary.version>2.12</scala.binary.version>
-    <spark.version>3.4.1</spark.version>
-  </properties>
+  <name>Zeppelin: Spark Common</name>
 
   <dependencies>
-
+    <!--
+      This is for ZEPPELIN-2221 using VersionInfo for check the version of 
Yarn.
+      It's checked that VersionInfo is compatible at least 2.2.0 to the latest 
one.
+    -->
     <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>spark-shims</artifactId>
-      <version>${project.version}</version>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
diff --git 
a/spark/spark3-shims/src/main/java/org/apache/zeppelin/spark/Spark3Shims.java 
b/spark/spark-common/src/main/java/org/apache/zeppelin/spark/SparkUtils.java
similarity index 54%
rename from 
spark/spark3-shims/src/main/java/org/apache/zeppelin/spark/Spark3Shims.java
rename to 
spark/spark-common/src/main/java/org/apache/zeppelin/spark/SparkUtils.java
index 094fca62c1..00948b198c 100644
--- 
a/spark/spark3-shims/src/main/java/org/apache/zeppelin/spark/Spark3Shims.java
+++ b/spark/spark-common/src/main/java/org/apache/zeppelin/spark/SparkUtils.java
@@ -15,9 +15,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.zeppelin.spark;
 
+
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.VersionUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.scheduler.SparkListener;
@@ -31,18 +33,41 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.ResultMessages;
 import org.apache.zeppelin.interpreter.SingleRowInterpreterResult;
 import org.apache.zeppelin.tabledata.TableDataUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
-public class Spark3Shims extends SparkShims {
+/**
+ * This is abstract class for anything that is api incompatible between spark1 
and spark2. It will
+ * load the correct version of SparkUtils based on the version of Spark.
+ */
+public class SparkUtils {
+
+  // the following lines for checking specific versions
+  private static final String HADOOP_VERSION_2_6_6 = "2.6.6";
+  private static final String HADOOP_VERSION_2_7_0 = "2.7.0";
+  private static final String HADOOP_VERSION_2_7_4 = "2.7.4";
+  private static final String HADOOP_VERSION_2_8_0 = "2.8.0";
+  private static final String HADOOP_VERSION_2_8_2 = "2.8.2";
+  private static final String HADOOP_VERSION_2_9_0 = "2.9.0";
+  private static final String HADOOP_VERSION_3_0_0 = "3.0.0";
+  private static final String HADOOP_VERSION_3_0_0_ALPHA4 = "3.0.0-alpha4";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkUtils.class);
+
+  protected SparkSession sparkSession;
 
-  private SparkSession sparkSession;
+  protected Properties properties;
 
-  public Spark3Shims(Properties properties, Object entryPoint) {
-    super(properties);
-    this.sparkSession = (SparkSession) entryPoint;
+  public SparkUtils(Properties properties, SparkSession sparkSession) {
+    this.properties = properties;
+    this.sparkSession = sparkSession;
   }
 
   public void setupSparkListener(final String master,
@@ -61,7 +86,6 @@ public class Spark3Shims extends SparkShims {
     });
   }
 
-  @Override
   public String showDataFrame(Object obj, int maxResult, InterpreterContext 
context) {
     if (obj instanceof Dataset) {
       Dataset<Row> df = ((Dataset) obj).toDF();
@@ -119,7 +143,6 @@ public class Spark3Shims extends SparkShims {
     return list;
   }
 
-  @Override
   public Dataset<Row> getAsDataFrame(String value) {
     String[] lines = value.split("\\n");
     String head = lines[0];
@@ -137,4 +160,66 @@ public class Spark3Shims extends SparkShims {
     }
     return sparkSession.createDataFrame(rows, schema);
   }
+
+  protected void buildSparkJobUrl(String master,
+                                  String sparkWebUrl,
+                                  int jobId,
+                                  Properties jobProperties,
+                                  InterpreterContext context) {
+    String jobUrl = null;
+    if (sparkWebUrl.contains("{jobId}")) {
+      jobUrl = sparkWebUrl.replace("{jobId}", jobId + "");
+    } else {
+      jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
+      String version = VersionInfo.getVersion();
+      if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) {
+        jobUrl = sparkWebUrl + "/jobs";
+      }
+    }
+
+    String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
+
+    Map<String, String> infos = new HashMap<>();
+    infos.put("jobUrl", jobUrl);
+    infos.put("label", "SPARK JOB");
+    infos.put("tooltip", "View in Spark web UI");
+    infos.put("noteId", getNoteId(jobGroupId));
+    infos.put("paraId", getParagraphId(jobGroupId));
+    LOGGER.debug("Send spark job url: {}", infos);
+    context.getIntpEventClient().onParaInfosReceived(infos);
+  }
+
+  public static String getNoteId(String jobGroupId) {
+    String[] tokens = jobGroupId.split("\\|");
+    if (tokens.length != 4) {
+      throw new RuntimeException("Invalid jobGroupId: " + jobGroupId);
+    }
+    return tokens[2];
+  }
+
+  public static String getParagraphId(String jobGroupId) {
+    String[] tokens = jobGroupId.split("\\|");
+    if (tokens.length != 4) {
+      throw new RuntimeException("Invalid jobGroupId: " + jobGroupId);
+    }
+    return tokens[3];
+  }
+
+  /**
+   * This is temporal patch for support old versions of Yarn which is not 
adopted YARN-6615
+   *
+   * @return true if YARN-6615 is patched, false otherwise
+   */
+  protected static boolean supportYarn6615(String version) {
+    return (VersionUtil.compareVersions(HADOOP_VERSION_2_6_6, version) <= 0
+            && VersionUtil.compareVersions(HADOOP_VERSION_2_7_0, version) > 0)
+        || (VersionUtil.compareVersions(HADOOP_VERSION_2_7_4, version) <= 0
+            && VersionUtil.compareVersions(HADOOP_VERSION_2_8_0, version) > 0)
+        || (VersionUtil.compareVersions(HADOOP_VERSION_2_8_2, version) <= 0
+            && VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) > 0)
+        || (VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) <= 0
+            && VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) > 0)
+        || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0_ALPHA4, version) 
<= 0)
+        || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) <= 0);
+  }
 }
diff --git a/spark/spark-shims/pom.xml b/spark/spark-shims/pom.xml
deleted file mode 100644
index 80112b7801..0000000000
--- a/spark/spark-shims/pom.xml
+++ /dev/null
@@ -1,63 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
-
-  <parent>
-    <artifactId>spark-parent</artifactId>
-    <groupId>org.apache.zeppelin</groupId>
-    <version>0.12.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>spark-shims</artifactId>
-  <packaging>jar</packaging>
-  <name>Zeppelin: Spark Shims</name>
-
-  <dependencies>
-    <!--
-      This is for ZEPPELIN-2221 using VersionInfo for check the version of 
Yarn.
-      It's checked that VersionInfo is compatible at least 2.2.0 to the latest 
one.
-    -->
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client-api</artifactId>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-resources-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>copy-interpreter-setting</id>
-            <phase>none</phase>
-            <configuration>
-              <skip>true</skip>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>
diff --git 
a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java 
b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
deleted file mode 100644
index 542c3de8bb..0000000000
--- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-
-import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.VersionUtil;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * This is abstract class for anything that is api incompatible between spark1 
and spark2. It will
- * load the correct version of SparkShims based on the version of Spark.
- */
-public abstract class SparkShims {
-
-  // the following lines for checking specific versions
-  private static final String HADOOP_VERSION_2_6_6 = "2.6.6";
-  private static final String HADOOP_VERSION_2_7_0 = "2.7.0";
-  private static final String HADOOP_VERSION_2_7_4 = "2.7.4";
-  private static final String HADOOP_VERSION_2_8_0 = "2.8.0";
-  private static final String HADOOP_VERSION_2_8_2 = "2.8.2";
-  private static final String HADOOP_VERSION_2_9_0 = "2.9.0";
-  private static final String HADOOP_VERSION_3_0_0 = "3.0.0";
-  private static final String HADOOP_VERSION_3_0_0_ALPHA4 = "3.0.0-alpha4";
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkShims.class);
-
-  private static SparkShims sparkShims;
-
-  protected Properties properties;
-
-  public SparkShims(Properties properties) {
-    this.properties = properties;
-  }
-
-  private static SparkShims loadShims(int sparkMajorVersion, Properties 
properties, Object entryPoint)
-      throws Exception {
-    Class<?> sparkShimsClass;
-    if (sparkMajorVersion == 3) {
-      LOGGER.info("Initializing shims for Spark 3.x");
-      sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark3Shims");
-    } else {
-      throw new Exception("Spark major version: '" + sparkMajorVersion + "' is 
not supported yet");
-    }
-
-    Constructor c = sparkShimsClass.getConstructor(Properties.class, 
Object.class);
-    return (SparkShims) c.newInstance(properties, entryPoint);
-  }
-
-  /**
-   *
-   * @param sparkVersion
-   * @param properties
-   * @param entryPoint  entryPoint is SparkContext for Spark 1.x SparkSession 
for Spark 2.x
-   * @return
-   */
-  public static SparkShims getInstance(String sparkVersion,
-                                       Properties properties,
-                                       Object entryPoint) throws Exception {
-    if (sparkShims == null) {
-      int sparkMajorVersion = 
SparkVersion.fromVersionString(sparkVersion).getMajorVersion();
-      sparkShims = loadShims(sparkMajorVersion, properties, entryPoint);
-    }
-    return sparkShims;
-  }
-
-  /**
-   * This is due to SparkListener api change between spark1 and spark2. 
SparkListener is trait in
-   * spark1 while it is abstract class in spark2.
-   */
-  public abstract void setupSparkListener(String master,
-                                          String sparkWebUrl,
-                                          InterpreterContext context);
-
-  public abstract String showDataFrame(Object obj, int maxResult, 
InterpreterContext context);
-
-  public abstract Object getAsDataFrame(String value);
-
-  protected void buildSparkJobUrl(String master,
-                                  String sparkWebUrl,
-                                  int jobId,
-                                  Properties jobProperties,
-                                  InterpreterContext context) {
-    String jobUrl = null;
-    if (sparkWebUrl.contains("{jobId}")) {
-      jobUrl = sparkWebUrl.replace("{jobId}", jobId + "");
-    } else {
-      jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
-      String version = VersionInfo.getVersion();
-      if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) {
-        jobUrl = sparkWebUrl + "/jobs";
-      }
-    }
-
-    String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
-
-    Map<String, String> infos = new HashMap<>();
-    infos.put("jobUrl", jobUrl);
-    infos.put("label", "SPARK JOB");
-    infos.put("tooltip", "View in Spark web UI");
-    infos.put("noteId", getNoteId(jobGroupId));
-    infos.put("paraId", getParagraphId(jobGroupId));
-    LOGGER.debug("Send spark job url: {}", infos);
-    context.getIntpEventClient().onParaInfosReceived(infos);
-  }
-
-  public static String getNoteId(String jobGroupId) {
-    String[] tokens = jobGroupId.split("\\|");
-    if (tokens.length != 4) {
-      throw new RuntimeException("Invalid jobGroupId: " + jobGroupId);
-    }
-    return tokens[2];
-  }
-
-  public static String getParagraphId(String jobGroupId) {
-    String[] tokens = jobGroupId.split("\\|");
-    if (tokens.length != 4) {
-      throw new RuntimeException("Invalid jobGroupId: " + jobGroupId);
-    }
-    return tokens[3];
-  }
-
-  /**
-   * This is temporal patch for support old versions of Yarn which is not 
adopted YARN-6615
-   *
-   * @return true if YARN-6615 is patched, false otherwise
-   */
-  protected boolean supportYarn6615(String version) {
-    return (VersionUtil.compareVersions(HADOOP_VERSION_2_6_6, version) <= 0
-            && VersionUtil.compareVersions(HADOOP_VERSION_2_7_0, version) > 0)
-        || (VersionUtil.compareVersions(HADOOP_VERSION_2_7_4, version) <= 0
-            && VersionUtil.compareVersions(HADOOP_VERSION_2_8_0, version) > 0)
-        || (VersionUtil.compareVersions(HADOOP_VERSION_2_8_2, version) <= 0
-            && VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) > 0)
-        || (VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) <= 0
-            && VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) > 0)
-        || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0_ALPHA4, version) 
<= 0)
-        || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) <= 0);
-  }
-
-  public static void reset() {
-    sparkShims = null;
-  }
-}


Reply via email to