This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new c13fb5b  [ZEPPELIN-4627]. Codegen fails for SparkInterpreter
c13fb5b is described below

commit c13fb5bea0017f4958ba7d1f26253ed29068e097
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Wed Feb 19 16:37:15 2020 +0800

    [ZEPPELIN-4627]. Codegen fails for SparkInterpreter
    
    ### What is this PR for?
    
    The root cause is classloader issue. This PR use the scala shell 
classloader as the classloader in SparkSqlInterpreter to execute sql. But scala 
2.12 still doesn't work for now, I will leave it for future as scala 2.12 is 
not used widely.
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4627
    
    ### How should this be tested?
    * Manually tested
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3656 from zjffdu/ZEPPELIN-4627 and squashes the following commits:
    
    69e1722e8 [Jeff Zhang] [ZEPPELIN-4627]. Codegen fails for SparkInterpreter
---
 .../zeppelin/spark/AbstractSparkScalaInterpreter.java   |  2 ++
 .../apache/zeppelin/spark/KotlinSparkInterpreter.java   |  4 +++-
 .../org/apache/zeppelin/spark/PySparkInterpreter.java   |  2 +-
 .../org/apache/zeppelin/spark/SparkInterpreter.java     | 17 ++++++++++++++++-
 .../org/apache/zeppelin/spark/SparkSqlInterpreter.java  | 17 ++++++++++++++---
 .../org/apache/zeppelin/spark/ZeppelinRContext.java     |  6 +++---
 spark/pom.xml                                           |  2 +-
 .../zeppelin/spark/SparkScala210Interpreter.scala       | 11 +++++++++--
 .../zeppelin/spark/SparkScala211Interpreter.scala       | 12 ++++++++++--
 spark/scala-2.12/pom.xml                                |  2 +-
 .../zeppelin/spark/SparkScala212Interpreter.scala       | 11 +++++++++--
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala      |  8 ++++++--
 12 files changed, 75 insertions(+), 19 deletions(-)

diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
index b5cc393..bf3abd8 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
@@ -68,4 +68,6 @@ public abstract class AbstractSparkScalaInterpreter {
   public abstract List<InterpreterCompletion> completion(String buf,
                                                          int cursor,
                                                          InterpreterContext 
interpreterContext);
+
+  public abstract ClassLoader getScalaShellClassLoader();
 }
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java
index 2cbb322..32de4b4 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java
@@ -21,6 +21,7 @@ import static org.apache.zeppelin.spark.Utils.buildJobDesc;
 import static org.apache.zeppelin.spark.Utils.buildJobGroupId;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,10 +79,11 @@ public class KotlinSparkInterpreter extends Interpreter {
 
     z = sparkInterpreter.getZeppelinContext();
 
+    // convert Object to SQLContext explicitly, that means Kotlin Spark may 
not work with Spark 1.x
     SparkKotlinReceiver ctx = new SparkKotlinReceiver(
         sparkInterpreter.getSparkSession(),
         jsc,
-        sparkInterpreter.getSQLContext(),
+        (SQLContext) sparkInterpreter.getSQLContext(),
         z);
 
     List<String> classpath = sparkClasspath();
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 01e7d32..5dbe410 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -198,7 +198,7 @@ public class PySparkInterpreter extends PythonInterpreter {
     }
   }
 
-  public SQLContext getSQLContext() {
+  public Object getSQLContext() {
     if (sparkInterpreter == null) {
       return null;
     } else {
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 7c91a57..ebd30f1 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -205,7 +205,14 @@ public class SparkInterpreter extends AbstractInterpreter {
     return this.sc;
   }
 
-  public SQLContext getSQLContext() {
+  /**
+   * Must use Object, because the its api signature in Spark 1.x is different 
from
+   * that of Spark 2.x.
+   * e.g. SqlContext.sql(sql) return different type.
+   *
+   * @return
+   */
+  public Object getSQLContext() {
     return sqlContext;
   }
 
@@ -235,6 +242,10 @@ public class SparkInterpreter extends AbstractInterpreter {
     }
   }
 
+  public boolean isScala212() throws InterpreterException {
+    return extractScalaVersion().contains("2.12");
+  }
+
   private List<String> getDependencyFiles() throws InterpreterException {
     List<String> depFiles = new ArrayList<>();
     // add jar from local repo
@@ -253,6 +264,10 @@ public class SparkInterpreter extends AbstractInterpreter {
     return depFiles;
   }
 
+  public ClassLoader getScalaShellClassLoader() {
+    return innerInterpreter.getScalaShellClassLoader();
+  }
+
   public boolean isUnsupportedSparkVersion() {
     return enableSupportedVersionCheck  && sparkVersion.isUnsupportedVersion();
   }
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 2577db6..9274612 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -33,6 +33,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Properties;
 
@@ -81,8 +82,8 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
     }
     Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, 
properties);
     sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
-    SQLContext sqlc = sparkInterpreter.getSQLContext();
-    SparkContext sc = sqlc.sparkContext();
+    Object sqlContext = sparkInterpreter.getSQLContext();
+    SparkContext sc = sparkInterpreter.getSparkContext();
 
     StringBuilder builder = new StringBuilder();
     List<String> sqls = sqlSplitter.splitSql(st);
@@ -92,10 +93,17 @@ public class SparkSqlInterpreter extends 
AbstractInterpreter {
     sc.setLocalProperty("spark.scheduler.pool", 
context.getLocalProperties().get("pool"));
     sc.setJobGroup(Utils.buildJobGroupId(context), 
Utils.buildJobDesc(context), false);
     String curSql = null;
+    ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
     try {
+      if (!sparkInterpreter.isScala212()) {
+        // TODO(zjffdu) scala 2.12 still doesn't work for codegen 
(ZEPPELIN-4627)
+      
Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader());
+      }
+      Method method = sqlContext.getClass().getMethod("sql", String.class);
       for (String sql : sqls) {
         curSql = sql;
-        String result = 
sparkInterpreter.getZeppelinContext().showData(sqlc.sql(sql), maxResult);
+        String result = sparkInterpreter.getZeppelinContext()
+                .showData(method.invoke(sqlContext, sql), maxResult);
         builder.append(result);
       }
     } catch (Exception e) {
@@ -111,6 +119,9 @@ public class SparkSqlInterpreter extends 
AbstractInterpreter {
       return new InterpreterResult(Code.ERROR, builder.toString());
     } finally {
       sc.clearJobGroup();
+      if (!sparkInterpreter.isScala212()) {
+        Thread.currentThread().setContextClassLoader(originalClassLoader);
+      }
     }
 
     return new InterpreterResult(Code.SUCCESS, builder.toString());
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
index a49b81d..becc869 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
@@ -27,7 +27,7 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
  */
 public class ZeppelinRContext {
   private static SparkContext sparkContext;
-  private static SQLContext sqlContext;
+  private static Object sqlContext;
   private static ZeppelinContext zeppelinContext;
   private static Object sparkSession;
   private static JavaSparkContext javaSparkContext;
@@ -40,7 +40,7 @@ public class ZeppelinRContext {
     ZeppelinRContext.zeppelinContext = zeppelinContext;
   }
 
-  public static void setSqlContext(SQLContext sqlContext) {
+  public static void setSqlContext(Object sqlContext) {
     ZeppelinRContext.sqlContext = sqlContext;
   }
 
@@ -52,7 +52,7 @@ public class ZeppelinRContext {
     return sparkContext;
   }
 
-  public static SQLContext getSqlContext() {
+  public static Object getSqlContext() {
     return sqlContext;
   }
 
diff --git a/spark/pom.xml b/spark/pom.xml
index 3d9d539..bb2eb48 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -43,7 +43,7 @@
         <!--plugin versions-->
         <plugin.scala.version>2.15.2</plugin.scala.version>
         <!-- spark versions -->
-        <spark.version>2.2.3</spark.version>
+        <spark.version>2.4.4</spark.version>
         <spark.scala.version>2.11.12</spark.scala.version>
         <spark.scala.binary.version>2.11</spark.scala.binary.version>
         <py4j.version>0.10.7</py4j.version>
diff --git 
a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
 
b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
index eb0e297..0eac200 100644
--- 
a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
+++ 
b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
@@ -58,8 +58,9 @@ class SparkScala210Interpreter(override val conf: SparkConf,
       interpreterOutput.setInterpreterOutput(InterpreterContext.get().out)
     }
     val rootDir = conf.get("spark.repl.classdir", 
System.getProperty("java.io.tmpdir"))
-    val outputDir = Files.createTempDirectory(Paths.get(rootDir), 
"spark").toFile
+    this.outputDir = Files.createTempDirectory(Paths.get(rootDir), 
"spark").toFile
     outputDir.deleteOnExit()
+    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
     conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
     // Only Spark1 requires to create http server, Spark2 removes HttpServer 
class.
     startHttpServer(outputDir).foreach { case (server, uri) =>
@@ -70,7 +71,9 @@ class SparkScala210Interpreter(override val conf: SparkConf,
     val settings = new Settings()
     settings.embeddedDefaults(sparkInterpreterClassLoader)
     settings.usejavacp.value = true
-    settings.classpath.value = getUserJars.mkString(File.pathSeparator)
+    this.userJars = getUserJars()
+    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+    settings.classpath.value = userJars.mkString(File.pathSeparator)
     if (properties.getProperty("zeppelin.spark.printREPLOutput", 
"true").toBoolean) {
       Console.setOut(interpreterOutput)
     }
@@ -107,4 +110,8 @@ class SparkScala210Interpreter(override val conf: SparkConf,
     }
   }
 
+  override def getScalaShellClassLoader: ClassLoader = {
+    val sparkIMain = sparkILoop.interpreter
+    callMethod(sparkIMain, "classLoader").asInstanceOf[ClassLoader]
+  }
 }
diff --git 
a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
 
b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 7d99a0b..64c6502 100644
--- 
a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ 
b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -58,7 +58,8 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     }
     // Only Spark1 requires to create http server, Spark2 removes HttpServer 
class.
     val rootDir = conf.get("spark.repl.classdir", 
System.getProperty("java.io.tmpdir"))
-    val outputDir = Files.createTempDirectory(Paths.get(rootDir), 
"spark").toFile
+    this.outputDir = Files.createTempDirectory(Paths.get(rootDir), 
"spark").toFile
+    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
     outputDir.deleteOnExit()
     conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
     startHttpServer(outputDir).foreach { case (server, uri) =>
@@ -71,7 +72,10 @@ class SparkScala211Interpreter(override val conf: SparkConf,
       "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
     settings.embeddedDefaults(sparkInterpreterClassLoader)
     settings.usejavacp.value = true
-    settings.classpath.value = getUserJars.mkString(File.pathSeparator)
+
+    this.userJars = getUserJars()
+    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+    settings.classpath.value = userJars.mkString(File.pathSeparator)
 
     val printReplOutput = 
properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
     val replOut = if (printReplOutput) {
@@ -122,6 +126,9 @@ class SparkScala211Interpreter(override val conf: SparkConf,
   def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
     sparkILoop.interpret(code)
 
+  override def getScalaShellClassLoader: ClassLoader = {
+    sparkILoop.classLoader
+  }
 }
 
 private object SparkScala211Interpreter {
@@ -191,4 +198,5 @@ private object SparkScala211Interpreter {
 
     loopPostInit()
   }
+
 }
diff --git a/spark/scala-2.12/pom.xml b/spark/scala-2.12/pom.xml
index fd31af4..5cbc657 100644
--- a/spark/scala-2.12/pom.xml
+++ b/spark/scala-2.12/pom.xml
@@ -34,7 +34,7 @@
 
   <properties>
     <spark.version>2.4.4</spark.version>
-    <spark.scala.version>2.12.8</spark.scala.version>
+    <spark.scala.version>2.12.10</spark.scala.version>
     <spark.scala.binary.version>2.12</spark.scala.binary.version>
     
<spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version>
   </properties>
diff --git 
a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
 
b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index a0fe7f1..6d90026 100644
--- 
a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ 
b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -56,7 +56,8 @@ class SparkScala212Interpreter(override val conf: SparkConf,
     }
     // Only Spark1 requires to create http server, Spark2 removes HttpServer 
class.
     val rootDir = conf.get("spark.repl.classdir", 
System.getProperty("java.io.tmpdir"))
-    val outputDir = Files.createTempDirectory(Paths.get(rootDir), 
"spark").toFile
+    this.outputDir = Files.createTempDirectory(Paths.get(rootDir), 
"spark").toFile
+    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
     outputDir.deleteOnExit()
     conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
 
@@ -65,7 +66,9 @@ class SparkScala212Interpreter(override val conf: SparkConf,
       "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
     settings.embeddedDefaults(sparkInterpreterClassLoader)
     settings.usejavacp.value = true
-    settings.classpath.value = getUserJars.mkString(File.pathSeparator)
+    this.userJars = getUserJars()
+    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+    settings.classpath.value = userJars.mkString(File.pathSeparator)
 
     val printReplOutput = 
properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
     val replOut = if (printReplOutput) {
@@ -116,4 +119,8 @@ class SparkScala212Interpreter(override val conf: SparkConf,
   def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
     sparkILoop.interpret(code)
 
+  override def getScalaShellClassLoader: ClassLoader = {
+    sparkILoop.classLoader
+  }
+
 }
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 46d06fa..772d279 100644
--- 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -19,7 +19,7 @@ package org.apache.zeppelin.spark
 
 
 import java.io.File
-import java.net.URLClassLoader
+import java.net.{URL, URLClassLoader}
 import java.nio.file.Paths
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{ZeppelinContext, InterpreterContext, 
InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, 
InterpreterResult, ZeppelinContext}
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConverters._
@@ -58,6 +58,10 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
 
   protected var sparkSession: Object = _
 
+  protected var outputDir: File = _
+
+  protected var userJars: Seq[String] = _
+
   protected var sparkHttpServer: Object = _
 
   protected var sparkUrl: String = _

Reply via email to