This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new c13fb5b [ZEPPELIN-4627]. Codegen fails for SparkInterpreter c13fb5b is described below commit c13fb5bea0017f4958ba7d1f26253ed29068e097 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Feb 19 16:37:15 2020 +0800 [ZEPPELIN-4627]. Codegen fails for SparkInterpreter ### What is this PR for? The root cause is classloader issue. This PR use the scala shell classloader as the classloader in SparkSqlInterpreter to execute sql. But scala 2.12 still doesn't work for now, I will leave it for future as scala 2.12 is not used widely. ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4627 ### How should this be tested? * Manually tested ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3656 from zjffdu/ZEPPELIN-4627 and squashes the following commits: 69e1722e8 [Jeff Zhang] [ZEPPELIN-4627]. Codegen fails for SparkInterpreter --- .../zeppelin/spark/AbstractSparkScalaInterpreter.java | 2 ++ .../apache/zeppelin/spark/KotlinSparkInterpreter.java | 4 +++- .../org/apache/zeppelin/spark/PySparkInterpreter.java | 2 +- .../org/apache/zeppelin/spark/SparkInterpreter.java | 17 ++++++++++++++++- .../org/apache/zeppelin/spark/SparkSqlInterpreter.java | 17 ++++++++++++++--- .../org/apache/zeppelin/spark/ZeppelinRContext.java | 6 +++--- spark/pom.xml | 2 +- .../zeppelin/spark/SparkScala210Interpreter.scala | 11 +++++++++-- .../zeppelin/spark/SparkScala211Interpreter.scala | 12 ++++++++++-- spark/scala-2.12/pom.xml | 2 +- .../zeppelin/spark/SparkScala212Interpreter.scala | 11 +++++++++-- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 8 ++++++-- 12 files changed, 75 insertions(+), 19 deletions(-) diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java index b5cc393..bf3abd8 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java @@ -68,4 +68,6 @@ public abstract class AbstractSparkScalaInterpreter { public abstract List<InterpreterCompletion> completion(String buf, int cursor, InterpreterContext interpreterContext); + + public abstract ClassLoader getScalaShellClassLoader(); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java index 2cbb322..32de4b4 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java @@ -21,6 +21,7 @@ import static org.apache.zeppelin.spark.Utils.buildJobDesc; import static org.apache.zeppelin.spark.Utils.buildJobGroupId; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; import org.apache.spark.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,10 +79,11 @@ public class KotlinSparkInterpreter extends Interpreter { z = sparkInterpreter.getZeppelinContext(); + // convert Object to SQLContext explicitly, that means Kotlin Spark may not work with Spark 1.x SparkKotlinReceiver ctx = new SparkKotlinReceiver( sparkInterpreter.getSparkSession(), jsc, - sparkInterpreter.getSQLContext(), + (SQLContext) sparkInterpreter.getSQLContext(), z); List<String> classpath = sparkClasspath(); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 01e7d32..5dbe410 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -198,7 +198,7 @@ public class PySparkInterpreter extends PythonInterpreter { } } - public SQLContext getSQLContext() { + public Object getSQLContext() { if (sparkInterpreter == null) { return null; } else { diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 7c91a57..ebd30f1 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -205,7 +205,14 @@ public class SparkInterpreter extends AbstractInterpreter { return this.sc; } - public SQLContext getSQLContext() { + /** + * Must use Object, because the its api signature in Spark 1.x is different from + * that of Spark 2.x. + * e.g. SqlContext.sql(sql) return different type. + * + * @return + */ + public Object getSQLContext() { return sqlContext; } @@ -235,6 +242,10 @@ public class SparkInterpreter extends AbstractInterpreter { } } + public boolean isScala212() throws InterpreterException { + return extractScalaVersion().contains("2.12"); + } + private List<String> getDependencyFiles() throws InterpreterException { List<String> depFiles = new ArrayList<>(); // add jar from local repo @@ -253,6 +264,10 @@ public class SparkInterpreter extends AbstractInterpreter { return depFiles; } + public ClassLoader getScalaShellClassLoader() { + return innerInterpreter.getScalaShellClassLoader(); + } + public boolean isUnsupportedSparkVersion() { return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion(); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 2577db6..9274612 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -33,6 +33,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; import java.util.List; import java.util.Properties; @@ -81,8 +82,8 @@ public class SparkSqlInterpreter extends AbstractInterpreter { } Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties); sparkInterpreter.getZeppelinContext().setInterpreterContext(context); - SQLContext sqlc = sparkInterpreter.getSQLContext(); - SparkContext sc = sqlc.sparkContext(); + Object sqlContext = sparkInterpreter.getSQLContext(); + SparkContext sc = sparkInterpreter.getSparkContext(); StringBuilder builder = new StringBuilder(); List<String> sqls = sqlSplitter.splitSql(st); @@ -92,10 +93,17 @@ public class SparkSqlInterpreter extends AbstractInterpreter { sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool")); sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false); String curSql = null; + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { + if (!sparkInterpreter.isScala212()) { + // TODO(zjffdu) scala 2.12 still doesn't work for codegen (ZEPPELIN-4627) + Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader()); + } + Method method = sqlContext.getClass().getMethod("sql", String.class); for (String sql : sqls) { curSql = sql; - String result = sparkInterpreter.getZeppelinContext().showData(sqlc.sql(sql), maxResult); + String result = sparkInterpreter.getZeppelinContext() + .showData(method.invoke(sqlContext, sql), maxResult); builder.append(result); } } catch (Exception e) { @@ -111,6 +119,9 @@ public class SparkSqlInterpreter extends AbstractInterpreter { return new InterpreterResult(Code.ERROR, builder.toString()); } finally { sc.clearJobGroup(); + if (!sparkInterpreter.isScala212()) { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } } return new InterpreterResult(Code.SUCCESS, builder.toString()); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java index a49b81d..becc869 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java @@ -27,7 +27,7 @@ import org.apache.zeppelin.interpreter.ZeppelinContext; */ public class ZeppelinRContext { private static SparkContext sparkContext; - private static SQLContext sqlContext; + private static Object sqlContext; private static ZeppelinContext zeppelinContext; private static Object sparkSession; private static JavaSparkContext javaSparkContext; @@ -40,7 +40,7 @@ public class ZeppelinRContext { ZeppelinRContext.zeppelinContext = zeppelinContext; } - public static void setSqlContext(SQLContext sqlContext) { + public static void setSqlContext(Object sqlContext) { ZeppelinRContext.sqlContext = sqlContext; } @@ -52,7 +52,7 @@ public class ZeppelinRContext { return sparkContext; } - public static SQLContext getSqlContext() { + public static Object getSqlContext() { return sqlContext; } diff --git a/spark/pom.xml b/spark/pom.xml index 3d9d539..bb2eb48 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -43,7 +43,7 @@ <!--plugin versions--> <plugin.scala.version>2.15.2</plugin.scala.version> <!-- spark versions --> - <spark.version>2.2.3</spark.version> + <spark.version>2.4.4</spark.version> <spark.scala.version>2.11.12</spark.scala.version> <spark.scala.binary.version>2.11</spark.scala.binary.version> <py4j.version>0.10.7</py4j.version> diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala index eb0e297..0eac200 100644 --- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala +++ b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala @@ -58,8 +58,9 @@ class SparkScala210Interpreter(override val conf: SparkConf, interpreterOutput.setInterpreterOutput(InterpreterContext.get().out) } val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) - val outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile + this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile outputDir.deleteOnExit() + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) // Only Spark1 requires to create http server, Spark2 removes HttpServer class. startHttpServer(outputDir).foreach { case (server, uri) => @@ -70,7 +71,9 @@ class SparkScala210Interpreter(override val conf: SparkConf, val settings = new Settings() settings.embeddedDefaults(sparkInterpreterClassLoader) settings.usejavacp.value = true - settings.classpath.value = getUserJars.mkString(File.pathSeparator) + this.userJars = getUserJars() + LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) + settings.classpath.value = userJars.mkString(File.pathSeparator) if (properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean) { Console.setOut(interpreterOutput) } @@ -107,4 +110,8 @@ class SparkScala210Interpreter(override val conf: SparkConf, } } + override def getScalaShellClassLoader: ClassLoader = { + val sparkIMain = sparkILoop.interpreter + callMethod(sparkIMain, "classLoader").asInstanceOf[ClassLoader] + } } diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index 7d99a0b..64c6502 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -58,7 +58,8 @@ class SparkScala211Interpreter(override val conf: SparkConf, } // Only Spark1 requires to create http server, Spark2 removes HttpServer class. val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) - val outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile + this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) outputDir.deleteOnExit() conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) startHttpServer(outputDir).foreach { case (server, uri) => @@ -71,7 +72,10 @@ class SparkScala211Interpreter(override val conf: SparkConf, "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) settings.embeddedDefaults(sparkInterpreterClassLoader) settings.usejavacp.value = true - settings.classpath.value = getUserJars.mkString(File.pathSeparator) + + this.userJars = getUserJars() + LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) + settings.classpath.value = userJars.mkString(File.pathSeparator) val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean val replOut = if (printReplOutput) { @@ -122,6 +126,9 @@ class SparkScala211Interpreter(override val conf: SparkConf, def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = sparkILoop.interpret(code) + override def getScalaShellClassLoader: ClassLoader = { + sparkILoop.classLoader + } } private object SparkScala211Interpreter { @@ -191,4 +198,5 @@ private object SparkScala211Interpreter { loopPostInit() } + } diff --git a/spark/scala-2.12/pom.xml b/spark/scala-2.12/pom.xml index fd31af4..5cbc657 100644 --- a/spark/scala-2.12/pom.xml +++ b/spark/scala-2.12/pom.xml @@ -34,7 +34,7 @@ <properties> <spark.version>2.4.4</spark.version> - <spark.scala.version>2.12.8</spark.scala.version> + <spark.scala.version>2.12.10</spark.scala.version> <spark.scala.binary.version>2.12</spark.scala.binary.version> <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version> </properties> diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index a0fe7f1..6d90026 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -56,7 +56,8 @@ class SparkScala212Interpreter(override val conf: SparkConf, } // Only Spark1 requires to create http server, Spark2 removes HttpServer class. val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) - val outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile + this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) outputDir.deleteOnExit() conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) @@ -65,7 +66,9 @@ class SparkScala212Interpreter(override val conf: SparkConf, "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) settings.embeddedDefaults(sparkInterpreterClassLoader) settings.usejavacp.value = true - settings.classpath.value = getUserJars.mkString(File.pathSeparator) + this.userJars = getUserJars() + LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) + settings.classpath.value = userJars.mkString(File.pathSeparator) val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean val replOut = if (printReplOutput) { @@ -116,4 +119,8 @@ class SparkScala212Interpreter(override val conf: SparkConf, def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = sparkILoop.interpret(code) + override def getScalaShellClassLoader: ClassLoader = { + sparkILoop.classLoader + } + } diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 46d06fa..772d279 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -19,7 +19,7 @@ package org.apache.zeppelin.spark import java.io.File -import java.net.URLClassLoader +import java.net.{URL, URLClassLoader} import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger @@ -27,7 +27,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{ZeppelinContext, InterpreterContext, InterpreterGroup, InterpreterResult} +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult, ZeppelinContext} import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ @@ -58,6 +58,10 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, protected var sparkSession: Object = _ + protected var outputDir: File = _ + + protected var userJars: Seq[String] = _ + protected var sparkHttpServer: Object = _ protected var sparkUrl: String = _