This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new f5040688c0 [ZEPPELIN-5702] Refactor spark interpreter module structure (#4344) f5040688c0 is described below commit f5040688c034e6c8350277c05fa7d1b4cc0f1b59 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri Apr 15 10:07:26 2022 +0800 [ZEPPELIN-5702] Refactor spark interpreter module structure (#4344) --- spark/README.md | 28 ++ .../spark/AbstractSparkScalaInterpreter.java | 305 ++++++++++++++++-- .../apache/zeppelin/spark/SparkInterpreter.java | 26 +- .../apache/zeppelin/spark/SparkRInterpreter.java | 5 - .../apache/zeppelin/spark/SparkSqlInterpreter.java | 9 +- .../main/java/org/apache/zeppelin/spark/Utils.java | 72 +---- .../zeppelin/spark/SparkInterpreterTest.java | 11 +- spark/scala-2.11/spark-scala-parent | 1 - .../zeppelin/spark/SparkScala211Interpreter.scala | 265 ++++++++++------ .../zeppelin/spark/SparkZeppelinContext.scala | 1 - spark/scala-2.12/spark-scala-parent | 1 - .../zeppelin/spark/SparkScala212Interpreter.scala | 234 +++++++++----- .../zeppelin/spark/SparkZeppelinContext.scala | 1 - spark/scala-2.13/spark-scala-parent | 1 - .../zeppelin/spark/SparkScala213Interpreter.scala | 189 ++++++++---- .../zeppelin/spark/SparkZeppelinContext.scala | 1 - spark/spark-scala-parent/pom.xml | 59 ---- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 341 --------------------- .../apache/zeppelin/spark/JobProgressUtil.scala | 49 --- .../java/org/apache/zeppelin/spark/SparkShims.java | 1 - .../integration/ZSessionIntegrationTest.java | 4 +- 21 files changed, 768 insertions(+), 836 deletions(-) diff --git a/spark/README.md b/spark/README.md new file mode 100644 index 0000000000..a9b039ec5e --- /dev/null +++ b/spark/README.md @@ -0,0 +1,28 @@ +# Spark Interpreter + +Spark interpreter is the first and most important interpreter of Zeppelin. It supports multiple versions of Spark and multiple versions of Scala. + + +# Module structure of Spark interpreter + +* interpreter + - This module is the entry module of Spark interpreter. All the interpreters are defined here. SparkInterpreter is the most important one, + SparkContext/SparkSession is created here, other interpreters (PySparkInterpreter,IPySparkInterpreter, SparkRInterpreter and etc) are all depends on SparkInterpreter. + Due to incompatibility between Scala versions, there are several scala-x modules for each supported Scala version. + Due to incompatibility between Spark versions, there are several spark-shims modules for each supported Spark version. +* spark-scala-parent + - Parent module for each Scala module +* scala-2.11 + - Scala module for Scala 2.11 +* scala-2.12 + - Scala module for Scala 2.12 +* scala-2.13 + - Scala module for Scala 2.13 +* spark-shims + - Parent module for each Spark module +* spark2-shims + - Shims module for Spark2 +* spark3-shims + - Shims module for Spark3 + + diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java index 71acd5e467..bc58ea7461 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java @@ -17,17 +17,32 @@ package org.apache.zeppelin.spark; +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; import org.apache.spark.sql.SQLContext; -import org.apache.zeppelin.interpreter.ZeppelinContext; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.spark.sql.SparkSession; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.kotlin.KotlinInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.List; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * This is bridge class which bridge the communication between java side and scala side. @@ -35,44 +50,286 @@ import java.util.List; */ public abstract class AbstractSparkScalaInterpreter { - public abstract SparkContext getSparkContext(); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSparkScalaInterpreter.class); + private static final AtomicInteger SESSION_NUM = new AtomicInteger(0); - public abstract SQLContext getSqlContext(); + protected SparkConf conf; + protected SparkContext sc; + protected SparkSession sparkSession; + protected SQLContext sqlContext; + protected String sparkUrl; + protected ZeppelinContext z; - public abstract Object getSparkSession(); + protected Properties properties; + protected List<String> depFiles; - public abstract String getSparkUrl(); + public AbstractSparkScalaInterpreter(SparkConf conf, + Properties properties, + List<String> depFiles) { + this.conf = conf; + this.properties = properties; + this.depFiles = depFiles; + } - public abstract ZeppelinContext getZeppelinContext(); + public SparkContext getSparkContext() { + return this.sc; + } - public int getProgress(InterpreterContext context) throws InterpreterException { - return getProgress(Utils.buildJobGroupId(context), context); + public SQLContext getSqlContext() { + return this.sqlContext; } - public abstract int getProgress(String jobGroup, - InterpreterContext context) throws InterpreterException; + public SparkSession getSparkSession() { + return this.sparkSession; + } - public void cancel(InterpreterContext context) throws InterpreterException { - getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context)); + public String getSparkUrl() { + return this.sparkUrl; } - public Interpreter.FormType getFormType() throws InterpreterException { - return Interpreter.FormType.SIMPLE; + public ZeppelinContext getZeppelinContext() { + return this.z; } - public abstract void open(); + public AbstractSparkScalaInterpreter() { + } + + public void open() throws InterpreterException { + /* Required for scoped mode. + * In scoped mode multiple scala compiler (repl) generates class in the same directory. + * Class names is not randomly generated and look like '$line12.$read$$iw$$iw' + * Therefore it's possible to generated class conflict(overwrite) with other repl generated + * class. + * + * To prevent generated class name conflict, + * change prefix of generated class name from each scala compiler (repl) instance. + * + * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern + * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$ + * + * As hashCode() can return a negative integer value and the minus character '-' is invalid + * in a package name we change it to a numeric value '0' which still conforms to the regexp. + * + */ + System.setProperty("scala.repl.name.line", ("$line" + this.hashCode()).replace('-', '0')); + SESSION_NUM.incrementAndGet(); - public abstract void close(); + createSparkILoop(); + createSparkContext(); + createZeppelinContext(); + } - public abstract InterpreterResult interpret(String st, InterpreterContext context); + public void close() throws InterpreterException { + // delete stagingDir for yarn mode + if (getSparkMaster().startsWith("yarn")) { + YarnConfiguration hadoopConf = new YarnConfiguration(); + Path appStagingBaseDir = null; + if (conf.contains("spark.yarn.stagingDir")) { + appStagingBaseDir = new Path(conf.get("spark.yarn.stagingDir")); + } else { + try { + appStagingBaseDir = FileSystem.get(hadoopConf).getHomeDirectory(); + } catch (IOException e) { + LOGGER.error("Fail to get stagingBaseDir", e); + } + } + if (appStagingBaseDir != null) { + Path stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + sc.applicationId()); + cleanupStagingDirInternal(stagingDirPath, hadoopConf); + } + } + + if (sc != null) { + sc.stop(); + sc = null; + } + if (sparkSession != null) { + sparkSession.stop(); + sparkSession = null; + } + sqlContext = null; + z = null; + } + + public abstract void createSparkILoop() throws InterpreterException; + + public abstract void createZeppelinContext() throws InterpreterException; + + public void cancel(InterpreterContext context) throws InterpreterException { + getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context)); + } + + public abstract InterpreterResult interpret(String st, + InterpreterContext context) throws InterpreterException; public abstract InterpreterResult delegateInterpret(KotlinInterpreter kotlinInterpreter, String st, - InterpreterContext context); + InterpreterContext context) throws InterpreterException; public abstract List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext); + InterpreterContext interpreterContext) throws InterpreterException; + + public abstract void bind(String name, + String tpe, + Object value, + List<String> modifier); + + // throw exception when fail to execute the code in scala shell, only used in initialization. + // not used t run user code. + public abstract void scalaInterpretQuietly(String code) throws InterpreterException; public abstract ClassLoader getScalaShellClassLoader(); + + private List<String> getUserFiles() { + return depFiles.stream() + .filter(f -> f.endsWith(".jar")) + .collect(Collectors.toList()); + } + + private void createSparkContext() throws InterpreterException { + SparkSession.Builder builder = SparkSession.builder().config(conf); + if (conf.get("spark.sql.catalogImplementation", "in-memory").equalsIgnoreCase("hive") + || conf.get("zeppelin.spark.useHiveContext", "false").equalsIgnoreCase("true")) { + boolean hiveSiteExisted = + Thread.currentThread().getContextClassLoader().getResource("hive-site.xml") != null; + if (hiveSiteExisted && hiveClassesArePresent()) { + sparkSession = builder.enableHiveSupport().getOrCreate(); + LOGGER.info("Created Spark session (with Hive support)"); + } else { + if (!hiveClassesArePresent()) { + LOGGER.warn("Hive support can not be enabled because spark is not built with hive"); + } + if (!hiveSiteExisted) { + LOGGER.warn("Hive support can not be enabled because no hive-site.xml found"); + } + sparkSession = builder.getOrCreate(); + LOGGER.info("Created Spark session (without Hive support)"); + } + } else { + sparkSession = builder.getOrCreate(); + LOGGER.info("Created Spark session (without Hive support)"); + } + + sc = sparkSession.sparkContext(); + getUserFiles().forEach(file -> sc.addFile(file)); + if (sc.uiWebUrl().isDefined()) { + sparkUrl = sc.uiWebUrl().get(); + } + sqlContext = sparkSession.sqlContext(); + + initAndSendSparkWebUrl(); + + bind("spark", sparkSession.getClass().getCanonicalName(), sparkSession, Lists.newArrayList("@transient")); + bind("sc", "org.apache.spark.SparkContext", sc, Lists.newArrayList("@transient")); + bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, Lists.newArrayList("@transient")); + + scalaInterpretQuietly("import org.apache.spark.SparkContext._"); + scalaInterpretQuietly("import spark.implicits._"); + scalaInterpretQuietly("import sqlContext.implicits._"); + scalaInterpretQuietly("import spark.sql"); + scalaInterpretQuietly("import org.apache.spark.sql.functions._"); + // print empty string otherwise the last statement's output of this method + // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code + scalaInterpretQuietly("print(\"\")"); + } + + /** + * @return true if Hive classes can be loaded, otherwise false. + */ + private boolean hiveClassesArePresent() { + try { + Class.forName("org.apache.spark.sql.hive.HiveSessionStateBuilder"); + Class.forName("org.apache.hadoop.hive.conf.HiveConf"); + return true; + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return false; + } + } + + private void initAndSendSparkWebUrl() { + String webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); + if (!StringUtils.isBlank(webUiUrl)) { + this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId()); + } else { + useYarnProxyURLIfNeeded(); + } + InterpreterContext.get().getIntpEventClient().sendWebUrlInfo(this.sparkUrl); + } + + private String getSparkMaster() { + if (conf == null) { + return ""; + } else { + return conf.get(SparkStringConstants.MASTER_PROP_NAME, + SparkStringConstants.DEFAULT_MASTER_VALUE); + } + } + + private void cleanupStagingDirInternal(Path stagingDirPath, Configuration hadoopConf) { + try { + FileSystem fs = stagingDirPath.getFileSystem(hadoopConf); + if (fs.delete(stagingDirPath, true)) { + LOGGER.info("Deleted staging directory " + stagingDirPath); + } + } catch (IOException e) { + LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, e); + } + } + + private void useYarnProxyURLIfNeeded() { + if (Boolean.parseBoolean(properties.getProperty("spark.webui.yarn.useProxy", "false"))) { + if (getSparkMaster().startsWith("yarn")) { + String appId = sc.applicationId(); + YarnClient yarnClient = YarnClient.createYarnClient(); + YarnConfiguration yarnConf = new YarnConfiguration(); + // disable timeline service as we only query yarn app here. + // Otherwise we may hit this kind of ERROR: + // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig + yarnConf.set("yarn.timeline-service.enabled", "false"); + yarnClient.init(yarnConf); + yarnClient.start(); + ApplicationReport appReport = null; + try { + appReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId)); + this.sparkUrl = appReport.getTrackingUrl(); + } catch (YarnException | IOException e) { + LOGGER.error("Fail to get yarn app report", e); + } + } + } + } + + public int getProgress(InterpreterContext context) throws InterpreterException { + String jobGroup = Utils.buildJobGroupId(context); + // Each paragraph has one unique jobGroup, and one paragraph may run multiple times. + // So only look for the first job which match the jobGroup + Optional<SparkJobInfo> jobInfoOptional = Arrays.stream(sc.statusTracker().getJobIdsForGroup(jobGroup)) + .mapToObj(jobId -> sc.statusTracker().getJobInfo(jobId)) + .filter(jobInfo -> jobInfo.isDefined()) + .map(jobInfo -> jobInfo.get()) + .findFirst(); + if (jobInfoOptional.isPresent()) { + List<SparkStageInfo> stageInfoList = Arrays.stream(jobInfoOptional.get().stageIds()) + .mapToObj(stageId -> sc.statusTracker().getStageInfo(stageId)) + .filter(stageInfo -> stageInfo.isDefined()) + .map(stageInfo -> stageInfo.get()) + .collect(Collectors.toList()); + int taskCount = stageInfoList.stream() + .map(stageInfo -> stageInfo.numTasks()) + .collect(Collectors.summingInt(Integer::intValue)); + int completedTaskCount = stageInfoList.stream() + .map(stageInfo -> stageInfo.numCompletedTasks()) + .collect(Collectors.summingInt(Integer::intValue)); + LOGGER.debug("Total TaskCount: " + taskCount); + LOGGER.debug("Completed TaskCount: " + completedTaskCount); + if (taskCount == 0) { + return 0; + } else { + return 100 * completedTaskCount / taskCount; + } + } else { + return 0; + } + } } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 7701ebf52d..035924e603 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -22,6 +22,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.zeppelin.interpreter.AbstractInterpreter; import org.apache.zeppelin.interpreter.ZeppelinContext; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -73,7 +74,7 @@ public class SparkInterpreter extends AbstractInterpreter { private SparkContext sc; private JavaSparkContext jsc; private SQLContext sqlContext; - private Object sparkSession; + private SparkSession sparkSession; private SparkVersion sparkVersion; private String scalaVersion; @@ -187,14 +188,14 @@ public class SparkInterpreter extends AbstractInterpreter { .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), innerInterpreterClazz.getClassLoader(), scalaShellOutputDir); } - @Override + @Override public void close() throws InterpreterException { LOGGER.info("Close SparkInterpreter"); if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) { innerInterpreter.close(); innerInterpreterClazz = null; } - innerInterpreter = null; + innerInterpreter = null; } @Override @@ -228,7 +229,7 @@ public class SparkInterpreter extends AbstractInterpreter { @Override public int getProgress(InterpreterContext context) throws InterpreterException { - return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context); + return innerInterpreter.getProgress(context); } public ZeppelinContext getZeppelinContext() { @@ -240,7 +241,7 @@ public class SparkInterpreter extends AbstractInterpreter { public InterpreterResult delegateInterpret(KotlinInterpreter kotlinInterpreter, String code, - InterpreterContext context) { + InterpreterContext context) throws InterpreterException{ return innerInterpreter.delegateInterpret(kotlinInterpreter, code, context); } @@ -248,14 +249,7 @@ public class SparkInterpreter extends AbstractInterpreter { return this.sc; } - /** - * Must use Object, because the its api signature in Spark 1.x is different from - * that of Spark 2.x. - * e.g. SqlContext.sql(sql) return different type. - * - * @return - */ - public Object getSQLContext() { + public SQLContext getSQLContext() { return sqlContext; } @@ -263,7 +257,7 @@ public class SparkInterpreter extends AbstractInterpreter { return this.jsc; } - public Object getSparkSession() { + public SparkSession getSparkSession() { return sparkSession; } @@ -297,11 +291,11 @@ public class SparkInterpreter extends AbstractInterpreter { } } - public boolean isScala211() throws InterpreterException { + public boolean isScala211() { return scalaVersion.equals("2.11"); } - public boolean isScala212() throws InterpreterException { + public boolean isScala212() { return scalaVersion.equals("2.12"); } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 4512ddbe6d..4335b6a107 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -148,9 +148,4 @@ public class SparkRInterpreter extends RInterpreter { return sparkInterpreter.getZeppelinContext(); } - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return new ArrayList<>(); - } } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index cb83bec24e..b2b9a69d32 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -20,6 +20,7 @@ package org.apache.zeppelin.spark; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.spark.SparkContext; import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.AbstractInterpreter; import org.apache.zeppelin.interpreter.ZeppelinContext; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -57,7 +58,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter { this.sqlSplitter = new SqlSplitter(); } - public boolean concurrentSQL() { + private boolean concurrentSQL() { return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL")); } @@ -83,7 +84,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter { } Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties); sparkInterpreter.getZeppelinContext().setInterpreterContext(context); - Object sqlContext = sparkInterpreter.getSQLContext(); + SQLContext sqlContext = sparkInterpreter.getSQLContext(); SparkContext sc = sparkInterpreter.getSparkContext(); List<String> sqls = sqlSplitter.splitSql(st); @@ -96,11 +97,10 @@ public class SparkSqlInterpreter extends AbstractInterpreter { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader()); - Method method = sqlContext.getClass().getMethod("sql", String.class); for (String sql : sqls) { curSql = sql; String result = sparkInterpreter.getZeppelinContext() - .showData(method.invoke(sqlContext, sql), maxResult); + .showData(sqlContext.sql(sql), maxResult); context.out.write(result); } context.out.flush(); @@ -156,7 +156,6 @@ public class SparkSqlInterpreter extends AbstractInterpreter { return FormType.SIMPLE; } - @Override public int getProgress(InterpreterContext context) throws InterpreterException { return sparkInterpreter.getProgress(context); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java index ea8fb8b4d0..578af8ed47 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -27,83 +27,17 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Utility and helper functions for the Spark Interpreter */ class Utils { - public static Logger logger = LoggerFactory.getLogger(Utils.class); - public static String DEPRRECATED_MESSAGE = + private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); + private static String DEPRECATED_MESSAGE = "%html <font color=\"red\">Spark lower than 2.2 is deprecated, " + "if you don't want to see this message, please set " + "zeppelin.spark.deprecateMsg.show to false.</font>"; - static Object invokeMethod(Object o, String name) { - return invokeMethod(o, name, new Class[]{}, new Object[]{}); - } - - static Object invokeMethod(Object o, String name, Class<?>[] argTypes, Object[] params) { - try { - return o.getClass().getMethod(name, argTypes).invoke(o, params); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - static Object invokeStaticMethod(Class<?> c, String name, Class<?>[] argTypes, Object[] params) { - try { - return c.getMethod(name, argTypes).invoke(null, params); - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - static Object invokeStaticMethod(Class<?> c, String name) { - return invokeStaticMethod(c, name, new Class[]{}, new Object[]{}); - } - - static Class<?> findClass(String name) { - return findClass(name, false); - } - - static Class<?> findClass(String name, boolean silence) { - try { - return Class.forName(name); - } catch (ClassNotFoundException e) { - if (!silence) { - logger.error(e.getMessage(), e); - } - return null; - } - } - - static Object instantiateClass(String name, Class<?>[] argTypes, Object[] params) { - try { - Constructor<?> constructor = Utils.class.getClassLoader() - .loadClass(name).getConstructor(argTypes); - return constructor.newInstance(params); - } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | - InstantiationException | InvocationTargetException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - // function works after intp is initialized - static boolean isScala2_10() { - try { - Class.forName("org.apache.spark.repl.SparkIMain"); - return true; - } catch (ClassNotFoundException e) { - return false; - } catch (IncompatibleClassChangeError e) { - return false; - } - } public static String buildJobGroupId(InterpreterContext context) { String uName = "anonymous"; @@ -136,7 +70,7 @@ class Utils { && Boolean.parseBoolean( properties.getProperty("zeppelin.spark.deprecatedMsg.show", "true"))) { try { - context.out.write(DEPRRECATED_MESSAGE); + context.out.write(DEPRECATED_MESSAGE); context.out.write("%text "); } catch (IOException e) { throw new InterpreterException(e); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 045d91ab46..3e89730bd2 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -140,12 +140,6 @@ public class SparkInterpreterTest { result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - if (!interpreter.isScala213()) { - // $intp not available for scala-2.13 - result = interpreter.interpret("$intp", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } - // Companion object with case class result = interpreter.interpret("import scala.math._\n" + "object Circle {\n" + @@ -207,6 +201,7 @@ public class SparkInterpreterTest { result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + context = getInterpreterContext(); result = interpreter.interpret( "case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" + "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" + @@ -216,8 +211,8 @@ public class SparkInterpreterTest { " s(3).replaceAll(\"\\\"\", \"\"),\n" + " s(5).replaceAll(\"\\\"\", \"\").toInt\n" + " )\n" + - ").toDF()", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + ").toDF()", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); // spark version result = interpreter.interpret("sc.version", getInterpreterContext()); diff --git a/spark/scala-2.11/spark-scala-parent b/spark/scala-2.11/spark-scala-parent deleted file mode 120000 index e5e899e58c..0000000000 --- a/spark/scala-2.11/spark-scala-parent +++ /dev/null @@ -1 +0,0 @@ -../spark-scala-parent \ No newline at end of file diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala index 013cab42cd..bb38c71a87 100644 --- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala @@ -17,92 +17,43 @@ package org.apache.zeppelin.spark -import java.io.{BufferedReader, File} -import java.net.URLClassLoader -import java.nio.file.{Files, Paths} -import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult} -import org.slf4j.LoggerFactory -import org.slf4j.Logger +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult} +import org.apache.zeppelin.kotlin.KotlinInterpreter +import org.slf4j.{Logger, LoggerFactory} +import java.io.{BufferedReader, File, PrintStream} +import java.net.URLClassLoader +import java.nio.file.Paths +import java.util.Properties +import scala.collection.JavaConverters._ import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -/** - * SparkInterpreter for scala-2.11 - */ -class SparkScala211Interpreter(override val conf: SparkConf, - override val depFiles: java.util.List[String], - override val properties: Properties, - override val interpreterGroup: InterpreterGroup, - override val sparkInterpreterClassLoader: URLClassLoader, - val outputDir: File) - extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { - import SparkScala211Interpreter._ +/** + * SparkInterpreter for scala-2.11. + * It only works for Spark 2.x, as Spark 3.x doesn't support scala-2.11 + */ +class SparkScala211Interpreter(conf: SparkConf, + depFiles: java.util.List[String], + properties: Properties, + interpreterGroup: InterpreterGroup, + sparkInterpreterClassLoader: URLClassLoader, + outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) { - lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) + private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) private var sparkILoop: SparkILoop = _ - private var scalaCompletion: Completion = _ + private val interpreterOutput = new InterpreterOutputStream(LOGGER) + private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME, + SparkStringConstants.DEFAULT_MASTER_VALUE) - override val interpreterOutput = new InterpreterOutputStream(LOGGER) - - override def open(): Unit = { - super.open() - if (sparkMaster == "yarn-client") { - System.setProperty("SPARK_YARN_MODE", "true") - } - - LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) - conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - val target = conf.get("spark.repl.target", "jvm-1.6") - val settings = new Settings() - settings.processArguments(List("-Yrepl-class-based", - "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) - settings.embeddedDefaults(sparkInterpreterClassLoader) - settings.usejavacp.value = true - settings.target.value = target - - this.userJars = getUserJars() - LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) - settings.classpath.value = userJars.mkString(File.pathSeparator) - - val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean - val replOut = if (printReplOutput) { - new JPrintWriter(interpreterOutput, true) - } else { - new JPrintWriter(Console.out, true) - } - sparkILoop = new SparkILoop(None, replOut) - sparkILoop.settings = settings - sparkILoop.createInterpreter() - - val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]] - val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true)) - - sparkILoop.in = reader - sparkILoop.initializeSynchronous() - loopPostInit(this) - this.scalaCompletion = reader.completion - - createSparkContext() - scalaInterpret("import org.apache.spark.SparkContext._") - scalaInterpret("import spark.implicits._") - scalaInterpret("import spark.sql") - scalaInterpret("import org.apache.spark.sql.functions._") - // print empty string otherwise the last statement's output of this method - // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code - scalaInterpret("print(\"\")") - createZeppelinContext() - } - - def interpret(code: String, context: InterpreterContext): InterpreterResult = { + override def interpret(code: String, context: InterpreterContext): InterpreterResult = { val originalOut = System.out val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean @@ -153,20 +104,21 @@ class SparkScala211Interpreter(override val conf: SparkConf, } lastStatus match { - case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" ) + case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression") case _ => new InterpreterResult(lastStatus) } } - protected override def completion(buf: String, - cursor: Int, - context: InterpreterContext): java.util.List[InterpreterCompletion] = { - val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates + override def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + scalaCompletion.completer().complete(buf.substring(0, cursor), cursor) + .candidates .map(e => new InterpreterCompletion(e, e, null)) - scala.collection.JavaConversions.seqAsJavaList(completions) + .asJava } - protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { + private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { sparkILoop.beQuietDuring { val result = sparkILoop.bind(name, tpe, value, modifier) if (result != IR.Success) { @@ -175,43 +127,155 @@ class SparkScala211Interpreter(override val conf: SparkConf, } } + override def bind(name: String, + tpe: String, + value: Object, + modifier: java.util.List[String]): Unit = + bind(name, tpe, value, modifier.asScala.toList) + + private def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = + sparkILoop.interpret(code) + + @throws[InterpreterException] + def scalaInterpretQuietly(code: String): Unit = { + scalaInterpret(code) match { + case scala.tools.nsc.interpreter.Results.Success => + // do nothing + case scala.tools.nsc.interpreter.Results.Error => + throw new InterpreterException("Fail to run code: " + code) + case scala.tools.nsc.interpreter.Results.Incomplete => + throw new InterpreterException("Incomplete code: " + code) + } + } + + override def getScalaShellClassLoader: ClassLoader = { + sparkILoop.classLoader + } + + // Used by KotlinSparkInterpreter + override def delegateInterpret(interpreter: KotlinInterpreter, + code: String, + context: InterpreterContext): InterpreterResult = { + val out = context.out + val newOut = if (out != null) new PrintStream(out) else null + Console.withOut(newOut) { + interpreter.interpret(code, context) + } + } + override def close(): Unit = { super.close() if (sparkILoop != null) { sparkILoop.closeInterpreter() - sparkILoop = null } } - def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = - sparkILoop.interpret(code) + override def createSparkILoop(): Unit = { + if (sparkMaster == "yarn-client") { + System.setProperty("SPARK_YARN_MODE", "true") + } - override def getScalaShellClassLoader: ClassLoader = { - sparkILoop.classLoader + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) + val target = conf.get("spark.repl.target", "jvm-1.6") + val settings = new Settings() + settings.processArguments(List("-Yrepl-class-based", + "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) + settings.embeddedDefaults(sparkInterpreterClassLoader) + settings.usejavacp.value = true + settings.target.value = target + val userJars = getUserJars() + LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) + settings.classpath.value = userJars.mkString(File.pathSeparator) + + val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean + val replOut = if (printReplOutput) { + new JPrintWriter(interpreterOutput, true) + } else { + new JPrintWriter(Console.out, true) + } + sparkILoop = new SparkILoop(None, replOut) + sparkILoop.settings = settings + sparkILoop.createInterpreter() + + val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]] + val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true)) + + sparkILoop.in = reader + sparkILoop.initializeSynchronous() + SparkScala211Interpreter.loopPostInit(this) + this.scalaCompletion = reader.completion + } + + override def createZeppelinContext(): Unit = { + val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession) + sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) + z = new SparkZeppelinContext(sc, sparkShims, + interpreterGroup.getInterpreterHookRegistry, + properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) + bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) + } + + private def getField(obj: Object, name: String): Object = { + val field = obj.getClass.getField(name) + field.setAccessible(true) + field.get(obj) + } + + private def callMethod(obj: Object, name: String, + parameterTypes: Array[Class[_]], + parameters: Array[Object]): Object = { + val method = obj.getClass.getMethod(name, parameterTypes: _ *) + method.setAccessible(true) + method.invoke(obj, parameters: _ *) + } + + private def getUserJars(): Seq[String] = { + var classLoader = Thread.currentThread().getContextClassLoader + var extraJars = Seq.empty[String] + while (classLoader != null) { + if (classLoader.getClass.getCanonicalName == + "org.apache.spark.util.MutableURLClassLoader") { + extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs() + // Check if the file exists. + .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } + // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. + .filterNot { + u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") + } + .map(url => url.toString).toSeq + classLoader = null + } else { + classLoader = classLoader.getParent + } + } + + extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath()) + LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) + extraJars } } private object SparkScala211Interpreter { /** - * This is a hack to call `loopPostInit` at `ILoop`. At higher version of Scala such - * as 2.11.12, `loopPostInit` became a nested function which is inaccessible. Here, - * we redefine `loopPostInit` at Scala's 2.11.8 side and ignore `loadInitFiles` being called at - * Scala 2.11.12 since here we do not have to load files. - * - * Both methods `loopPostInit` and `unleashAndSetPhase` are redefined, and `phaseCommand` and - * `asyncMessage` are being called via reflection since both exist in Scala 2.11.8 and 2.11.12. - * - * Please see the codes below: - * https://github.com/scala/scala/blob/v2.11.8/src/repl/scala/tools/nsc/interpreter/ILoop.scala - * https://github.com/scala/scala/blob/v2.11.12/src/repl/scala/tools/nsc/interpreter/ILoop.scala - * - * See also ZEPPELIN-3810. - */ + * This is a hack to call `loopPostInit` at `ILoop`. At higher version of Scala such + * as 2.11.12, `loopPostInit` became a nested function which is inaccessible. Here, + * we redefine `loopPostInit` at Scala's 2.11.8 side and ignore `loadInitFiles` being called at + * Scala 2.11.12 since here we do not have to load files. + * + * Both methods `loopPostInit` and `unleashAndSetPhase` are redefined, and `phaseCommand` and + * `asyncMessage` are being called via reflection since both exist in Scala 2.11.8 and 2.11.12. + * + * Please see the codes below: + * https://github.com/scala/scala/blob/v2.11.8/src/repl/scala/tools/nsc/interpreter/ILoop.scala + * https://github.com/scala/scala/blob/v2.11.12/src/repl/scala/tools/nsc/interpreter/ILoop.scala + * + * See also ZEPPELIN-3810. + */ private def loopPostInit(interpreter: SparkScala211Interpreter): Unit = { import StdReplTags._ - import scala.reflect.classTag - import scala.reflect.io + import scala.reflect.{classTag, io} val sparkILoop = interpreter.sparkILoop val intp = sparkILoop.intp @@ -258,5 +322,4 @@ private object SparkScala211Interpreter { loopPostInit() } - } diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala similarity index 99% copy from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala copy to spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala index fa4188c036..410ed4cf54 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala +++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala @@ -20,7 +20,6 @@ package org.apache.zeppelin.spark import java.util import org.apache.spark.SparkContext -import org.apache.spark.sql.DataFrame import org.apache.zeppelin.annotation.ZeppelinApi import org.apache.zeppelin.display.AngularObjectWatcher import org.apache.zeppelin.display.ui.OptionInput.ParamOption diff --git a/spark/scala-2.12/spark-scala-parent b/spark/scala-2.12/spark-scala-parent deleted file mode 120000 index e5e899e58c..0000000000 --- a/spark/scala-2.12/spark-scala-parent +++ /dev/null @@ -1 +0,0 @@ -../spark-scala-parent \ No newline at end of file diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index f31293239a..764dda93b0 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -17,88 +17,44 @@ package org.apache.zeppelin.spark -import java.io.{BufferedReader, File} -import java.net.URLClassLoader -import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult} -import org.slf4j.LoggerFactory -import org.slf4j.Logger +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult} +import org.apache.zeppelin.kotlin.KotlinInterpreter +import org.slf4j.{Logger, LoggerFactory} +import java.io.{BufferedReader, File, PrintStream} +import java.net.URLClassLoader +import java.nio.file.Paths +import java.util.Properties +import scala.collection.JavaConverters._ import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.ILoop.loopToInterpreter import scala.tools.nsc.interpreter._ + /** - * SparkInterpreter for scala-2.12 - */ -class SparkScala212Interpreter(override val conf: SparkConf, - override val depFiles: java.util.List[String], - override val properties: Properties, - override val interpreterGroup: InterpreterGroup, - override val sparkInterpreterClassLoader: URLClassLoader, - val outputDir: File) - extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { - - lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) + * SparkInterpreter for scala-2.12. + * It is used by both Spark 2.x and 3.x + */ +class SparkScala212Interpreter(conf: SparkConf, + depFiles: java.util.List[String], + properties: Properties, + interpreterGroup: InterpreterGroup, + sparkInterpreterClassLoader: URLClassLoader, + outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) { - private var sparkILoop: SparkILoop = _ + private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) + private var sparkILoop: SparkILoop = _ private var scalaCompletion: Completion = _ + private val interpreterOutput = new InterpreterOutputStream(LOGGER) + private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME, + SparkStringConstants.DEFAULT_MASTER_VALUE) - override val interpreterOutput = new InterpreterOutputStream(LOGGER) - - override def open(): Unit = { - super.open() - if (sparkMaster == "yarn-client") { - System.setProperty("SPARK_YARN_MODE", "true") - } - - LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) - conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - - val settings = new Settings() - settings.processArguments(List("-Yrepl-class-based", - "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) - settings.embeddedDefaults(sparkInterpreterClassLoader) - settings.usejavacp.value = true - this.userJars = getUserJars() - LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) - settings.classpath.value = userJars.mkString(File.pathSeparator) - - val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean - val replOut = if (printReplOutput) { - new JPrintWriter(interpreterOutput, true) - } else { - new JPrintWriter(Console.out, true) - } - sparkILoop = new SparkILoop(None, replOut) - sparkILoop.settings = settings - sparkILoop.createInterpreter() - val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]] - val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true)) - - sparkILoop.in = reader - sparkILoop.initializeSynchronous() - SparkScala212Interpreter.loopPostInit(this) - this.scalaCompletion = reader.completion - - createSparkContext() - - scalaInterpret("import org.apache.spark.SparkContext._") - scalaInterpret("import spark.implicits._") - scalaInterpret("import spark.sql") - scalaInterpret("import org.apache.spark.sql.functions._") - // print empty string otherwise the last statement's output of this method - // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code - scalaInterpret("print(\"\")") - - createZeppelinContext() - } - - def interpret(code: String, context: InterpreterContext): InterpreterResult = { + override def interpret(code: String, context: InterpreterContext): InterpreterResult = { val originalOut = System.out val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean @@ -149,20 +105,21 @@ class SparkScala212Interpreter(override val conf: SparkConf, } lastStatus match { - case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" ) + case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression") case _ => new InterpreterResult(lastStatus) } } - protected override def completion(buf: String, - cursor: Int, - context: InterpreterContext): java.util.List[InterpreterCompletion] = { - val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates + override def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + scalaCompletion.complete(buf.substring(0, cursor), cursor) + .candidates .map(e => new InterpreterCompletion(e, e, null)) - scala.collection.JavaConversions.seqAsJavaList(completions) + .asJava } - protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { + private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { sparkILoop.beQuietDuring { val result = sparkILoop.bind(name, tpe, value, modifier) if (result != IR.Success) { @@ -171,22 +128,134 @@ class SparkScala212Interpreter(override val conf: SparkConf, } } + override def bind(name: String, + tpe: String, + value: Object, + modifier: java.util.List[String]): Unit = + bind(name, tpe, value, modifier.asScala.toList) + + def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = + sparkILoop.interpret(code) + + @throws[InterpreterException] + def scalaInterpretQuietly(code: String): Unit = { + scalaInterpret(code) match { + case scala.tools.nsc.interpreter.Results.Success => + // do nothing + case scala.tools.nsc.interpreter.Results.Error => + throw new InterpreterException("Fail to run code: " + code) + case scala.tools.nsc.interpreter.Results.Incomplete => + throw new InterpreterException("Incomplete code: " + code) + } + } + + override def getScalaShellClassLoader: ClassLoader = { + sparkILoop.classLoader + } + + // Used by KotlinSparkInterpreter + override def delegateInterpret(interpreter: KotlinInterpreter, + code: String, + context: InterpreterContext): InterpreterResult = { + val out = context.out + val newOut = if (out != null) new PrintStream(out) else null + Console.withOut(newOut) { + interpreter.interpret(code, context) + } + } + + def interpret(code: String): InterpreterResult = + interpret(code, InterpreterContext.get()) override def close(): Unit = { super.close() if (sparkILoop != null) { sparkILoop.closeInterpreter() - sparkILoop = null } } - def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result = - sparkILoop.interpret(code) + override def createSparkILoop(): Unit = { + if (sparkMaster == "yarn-client") { + System.setProperty("SPARK_YARN_MODE", "true") + } - override def getScalaShellClassLoader: ClassLoader = { - sparkILoop.classLoader + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) + + val settings = new Settings() + settings.processArguments(List("-Yrepl-class-based", + "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) + settings.embeddedDefaults(sparkInterpreterClassLoader) + settings.usejavacp.value = true + val userJars = getUserJars() + LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) + settings.classpath.value = userJars.mkString(File.pathSeparator) + + val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean + val replOut = if (printReplOutput) { + new JPrintWriter(interpreterOutput, true) + } else { + new JPrintWriter(Console.out, true) + } + sparkILoop = new SparkILoop(None, replOut) + sparkILoop.settings = settings + sparkILoop.createInterpreter() + val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]] + val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true)) + + sparkILoop.in = reader + sparkILoop.initializeSynchronous() + SparkScala212Interpreter.loopPostInit(this) + this.scalaCompletion = reader.completion + } + + override def createZeppelinContext(): Unit = { + val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession) + sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) + z = new SparkZeppelinContext(sc, sparkShims, + interpreterGroup.getInterpreterHookRegistry, + properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) + bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) + } + + private def getDeclareField(obj: Object, name: String): Object = { + val field = obj.getClass.getDeclaredField(name) + field.setAccessible(true) + field.get(obj) } + private def callMethod(obj: Object, name: String, + parameterTypes: Array[Class[_]], + parameters: Array[Object]): Object = { + val method = obj.getClass.getMethod(name, parameterTypes: _ *) + method.setAccessible(true) + method.invoke(obj, parameters: _ *) + } + + private def getUserJars(): Seq[String] = { + var classLoader = Thread.currentThread().getContextClassLoader + var extraJars = Seq.empty[String] + while (classLoader != null) { + if (classLoader.getClass.getCanonicalName == + "org.apache.spark.util.MutableURLClassLoader") { + extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs() + // Check if the file exists. + .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } + // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. + .filterNot { + u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") + } + .map(url => url.toString).toSeq + classLoader = null + } else { + classLoader = classLoader.getParent + } + } + + extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath()) + LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) + extraJars + } } private object SparkScala212Interpreter { @@ -207,8 +276,7 @@ private object SparkScala212Interpreter { */ private def loopPostInit(interpreter: SparkScala212Interpreter): Unit = { import StdReplTags._ - import scala.reflect.classTag - import scala.reflect.io + import scala.reflect.{classTag, io} val sparkILoop = interpreter.sparkILoop val intp = sparkILoop.intp diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala similarity index 99% copy from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala copy to spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala index fa4188c036..410ed4cf54 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala @@ -20,7 +20,6 @@ package org.apache.zeppelin.spark import java.util import org.apache.spark.SparkContext -import org.apache.spark.sql.DataFrame import org.apache.zeppelin.annotation.ZeppelinApi import org.apache.zeppelin.display.AngularObjectWatcher import org.apache.zeppelin.display.ui.OptionInput.ParamOption diff --git a/spark/scala-2.13/spark-scala-parent b/spark/scala-2.13/spark-scala-parent deleted file mode 120000 index e5e899e58c..0000000000 --- a/spark/scala-2.13/spark-scala-parent +++ /dev/null @@ -1 +0,0 @@ -../spark-scala-parent \ No newline at end of file diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala index f896101dda..91afc8545c 100644 --- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala @@ -21,78 +21,40 @@ package org.apache.zeppelin.spark import org.apache.spark.SparkConf import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult} +import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult} +import org.apache.zeppelin.kotlin.KotlinInterpreter import org.slf4j.{Logger, LoggerFactory} -import java.io.{File, PrintWriter} +import java.io.{File, PrintStream, PrintWriter} import java.net.URLClassLoader +import java.nio.file.Paths import java.util.Properties +import scala.jdk.CollectionConverters._ import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ import scala.tools.nsc.interpreter.shell.{Accumulator, Completion, ReplCompletion} /** - * SparkInterpreter for scala-2.13 - */ -class SparkScala213Interpreter(override val conf: SparkConf, - override val depFiles: java.util.List[String], - override val properties: Properties, - override val interpreterGroup: InterpreterGroup, - override val sparkInterpreterClassLoader: URLClassLoader, - val outputDir: File) - extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) { - - lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass) + * SparkInterpreter for scala-2.13. + * It only works for Spark 3.x because only Spark 3.x supports scala-2.13. + */ +class SparkScala213Interpreter(conf: SparkConf, + depFiles: java.util.List[String], + properties: Properties, + interpreterGroup: InterpreterGroup, + sparkInterpreterClassLoader: URLClassLoader, + outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) { - private var sparkILoop: SparkILoop = _ + private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) + private var sparkILoop: SparkILoop = _ private var scalaCompletion: Completion = _ + private val interpreterOutput = new InterpreterOutputStream(LOGGER) + private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME, + SparkStringConstants.DEFAULT_MASTER_VALUE) - override val interpreterOutput = new InterpreterOutputStream(LOGGER) - - override def open(): Unit = { - super.open() - if (sparkMaster == "yarn-client") { - System.setProperty("SPARK_YARN_MODE", "true") - } - LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) - conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - - val settings = new Settings() - settings.processArguments(List("-Yrepl-class-based", - "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) - settings.embeddedDefaults(sparkInterpreterClassLoader) - settings.usejavacp.value = true - this.userJars = getUserJars() - LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) - settings.classpath.value = userJars.mkString(File.pathSeparator) - - val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean - val replOut = if (printReplOutput) { - new PrintWriter(interpreterOutput, true) - } else { - new PrintWriter(Console.out, true) - } - sparkILoop = new SparkILoop(null, replOut) - sparkILoop.run(settings) - this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator) - Thread.currentThread.setContextClassLoader(sparkILoop.classLoader) - - createSparkContext() - - scalaInterpret("import org.apache.spark.SparkContext._") - scalaInterpret("import spark.implicits._") - scalaInterpret("import spark.sql") - scalaInterpret("import org.apache.spark.sql.functions._") - // print empty string otherwise the last statement's output of this method - // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code - scalaInterpret("print(\"\")") - - createZeppelinContext() - } def interpret(code: String, context: InterpreterContext): InterpreterResult = { - val originalOut = System.out val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean @@ -142,23 +104,36 @@ class SparkScala213Interpreter(override val conf: SparkConf, } lastStatus match { - case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" ) + case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression") case _ => new InterpreterResult(lastStatus) } } - def scalaInterpret(code: String): scala.tools.nsc.interpreter.Results.Result = + private def scalaInterpret(code: String): scala.tools.nsc.interpreter.Results.Result = sparkILoop.interpret(code) - protected override def completion(buf: String, - cursor: Int, - context: InterpreterContext): java.util.List[InterpreterCompletion] = { - val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates + @throws[InterpreterException] + def scalaInterpretQuietly(code: String): Unit = { + scalaInterpret(code) match { + case scala.tools.nsc.interpreter.Results.Success => + // do nothing + case scala.tools.nsc.interpreter.Results.Error => + throw new InterpreterException("Fail to run code: " + code) + case scala.tools.nsc.interpreter.Results.Incomplete => + throw new InterpreterException("Incomplete code: " + code) + } + } + + override def completion(buf: String, + cursor: Int, + context: InterpreterContext): java.util.List[InterpreterCompletion] = { + scalaCompletion.complete(buf.substring(0, cursor), cursor) + .candidates .map(e => new InterpreterCompletion(e.defString, e.defString, null)) - scala.collection.JavaConverters.asJava(completions) + .asJava } - protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { + private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = { sparkILoop.beQuietDuring { val result = sparkILoop.bind(name, tpe, value, modifier) if (result != Results.Success) { @@ -167,6 +142,27 @@ class SparkScala213Interpreter(override val conf: SparkConf, } } + override def bind(name: String, + tpe: String, + value: Object, + modifier: java.util.List[String]): Unit = + bind(name, tpe, value, modifier.asScala.toList) + + override def getScalaShellClassLoader: ClassLoader = { + sparkILoop.classLoader + } + + // Used by KotlinSparkInterpreter + def delegateInterpret(interpreter: KotlinInterpreter, + code: String, + context: InterpreterContext): InterpreterResult = { + val out = context.out + val newOut = if (out != null) new PrintStream(out) else null + Console.withOut(newOut) { + interpreter.interpret(code, context) + } + } + override def close(): Unit = { super.close() if (sparkILoop != null) { @@ -174,7 +170,66 @@ class SparkScala213Interpreter(override val conf: SparkConf, } } - override def getScalaShellClassLoader: ClassLoader = { - sparkILoop.classLoader + override def createSparkILoop(): Unit = { + if (sparkMaster == "yarn-client") { + System.setProperty("SPARK_YARN_MODE", "true") + } + LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath) + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) + + // createSpark + val settings = new Settings() + settings.processArguments(List("-Yrepl-class-based", + "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) + settings.embeddedDefaults(sparkInterpreterClassLoader) + settings.usejavacp.value = true + val userJars = getUserJars() + LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator)) + settings.classpath.value = userJars.mkString(File.pathSeparator) + + val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean + val replOut = if (printReplOutput) { + new PrintWriter(interpreterOutput, true) + } else { + new PrintWriter(Console.out, true) + } + sparkILoop = new SparkILoop(null, replOut) + sparkILoop.run(settings) + this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator) + Thread.currentThread.setContextClassLoader(sparkILoop.classLoader) + } + + override def createZeppelinContext(): Unit = { + val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession) + sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) + z = new SparkZeppelinContext(sc, sparkShims, + interpreterGroup.getInterpreterHookRegistry, + properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) + bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) + } + + private def getUserJars(): Seq[String] = { + var classLoader = Thread.currentThread().getContextClassLoader + var extraJars = Seq.empty[String] + while (classLoader != null) { + if (classLoader.getClass.getCanonicalName == + "org.apache.spark.util.MutableURLClassLoader") { + extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs() + // Check if the file exists. + .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } + // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. + .filterNot { + u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") + } + .map(url => url.toString).toSeq + classLoader = null + } else { + classLoader = classLoader.getParent + } + } + + extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath()) + LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) + extraJars } } diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala similarity index 99% rename from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala rename to spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala index fa4188c036..410ed4cf54 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala @@ -20,7 +20,6 @@ package org.apache.zeppelin.spark import java.util import org.apache.spark.SparkContext -import org.apache.spark.sql.DataFrame import org.apache.zeppelin.annotation.ZeppelinApi import org.apache.zeppelin.display.AngularObjectWatcher import org.apache.zeppelin.display.ui.OptionInput.ParamOption diff --git a/spark/spark-scala-parent/pom.xml b/spark/spark-scala-parent/pom.xml index 1cc02a3a63..d00ed99bba 100644 --- a/spark/spark-scala-parent/pom.xml +++ b/spark/spark-scala-parent/pom.xml @@ -149,65 +149,6 @@ </executions> </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-scala-sources</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>${project.basedir}/../spark-scala-parent/src/main/scala</source> - </sources> - </configuration> - </execution> - <execution> - <id>add-scala-test-sources</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>${project.basedir}/../spark-scala-parent/src/test/scala</source> - </sources> - </configuration> - </execution> - <execution> - <id>add-resource</id> - <phase>generate-resources</phase> - <goals> - <goal>add-resource</goal> - </goals> - <configuration> - <resources> - <resource> - <directory>${project.basedir}/../spark-scala-parent/src/main/resources</directory> - </resource> - </resources> - </configuration> - </execution> - <execution> - <id>add-test-resource</id> - <phase>generate-test-resources</phase> - <goals> - <goal>add-test-resource</goal> - </goals> - <configuration> - <resources> - <resource> - <directory>${project.basedir}/../spark-scala-parent/src/test/resources</directory> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala deleted file mode 100644 index b07de98bfe..0000000000 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - - -import java.io.{File, IOException, PrintStream} -import java.net.URLClassLoader -import java.nio.file.Paths -import java.util.concurrent.atomic.AtomicInteger -import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.yarn.client.api.YarnClient -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.{SQLContext, SparkSession} -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult, ZeppelinContext} -import org.apache.zeppelin.kotlin.KotlinInterpreter -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConverters._ - - -/** - * Base class for different scala versions of SparkInterpreter. It should be - * binary compatible between multiple scala versions. - * - * @param conf - * @param depFiles - * @param properties - * @param interpreterGroup - */ -abstract class BaseSparkScalaInterpreter(val conf: SparkConf, - val depFiles: java.util.List[String], - val properties: java.util.Properties, - val interpreterGroup: InterpreterGroup, - val sparkInterpreterClassLoader: URLClassLoader) - extends AbstractSparkScalaInterpreter() { - - protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - - protected var sc: SparkContext = _ - - protected var sqlContext: SQLContext = _ - - protected var sparkSession: SparkSession = _ - - protected var userJars: Seq[String] = _ - - protected var sparkUrl: String = _ - - protected var z: SparkZeppelinContext = _ - - protected val interpreterOutput: InterpreterOutputStream - - protected val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME, - SparkStringConstants.DEFAULT_MASTER_VALUE) - - protected def open(): Unit = { - /* Required for scoped mode. - * In scoped mode multiple scala compiler (repl) generates class in the same directory. - * Class names is not randomly generated and look like '$line12.$read$$iw$$iw' - * Therefore it's possible to generated class conflict(overwrite) with other repl generated - * class. - * - * To prevent generated class name conflict, - * change prefix of generated class name from each scala compiler (repl) instance. - * - * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern - * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$ - * - * As hashCode() can return a negative integer value and the minus character '-' is invalid - * in a package name we change it to a numeric value '0' which still conforms to the regexp. - * - */ - System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0')) - - BaseSparkScalaInterpreter.sessionNum.incrementAndGet() - } - - // Used by KotlinSparkInterpreter - def delegateInterpret(interpreter: KotlinInterpreter, - code: String, - context: InterpreterContext): InterpreterResult = { - val out = context.out - val newOut = if (out != null) new PrintStream(out) else null - Console.withOut(newOut) { - interpreter.interpret(code, context) - } - } - - protected def interpret(code: String): InterpreterResult = - interpret(code, InterpreterContext.get()) - - protected def getProgress(jobGroup: String, context: InterpreterContext): Int = { - JobProgressUtil.progress(sc, jobGroup) - } - - override def getSparkContext: SparkContext = sc - - override def getSqlContext: SQLContext = sqlContext - - override def getSparkSession: AnyRef = sparkSession - - override def getSparkUrl: String = sparkUrl - - override def getZeppelinContext: ZeppelinContext = z - - protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit - - // for use in java side - protected def bind(name: String, - tpe: String, - value: Object, - modifier: java.util.List[String]): Unit = - bind(name, tpe, value, modifier.asScala.toList) - - protected def close(): Unit = { - // delete stagingDir for yarn mode - if (sparkMaster.startsWith("yarn")) { - val hadoopConf = new YarnConfiguration() - val appStagingBaseDir = if (conf.contains("spark.yarn.stagingDir")) { - new Path(conf.get("spark.yarn.stagingDir")) - } else { - FileSystem.get(hadoopConf).getHomeDirectory() - } - val stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + sc.applicationId) - cleanupStagingDirInternal(stagingDirPath, hadoopConf) - } - - if (sc != null) { - sc.stop() - sc = null - } - if (sparkSession != null) { - sparkSession.getClass.getMethod("stop").invoke(sparkSession) - sparkSession = null - } - sqlContext = null - z = null - } - - private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: Configuration): Unit = { - try { - val fs = stagingDirPath.getFileSystem(hadoopConf) - if (fs.delete(stagingDirPath, true)) { - LOGGER.info(s"Deleted staging directory $stagingDirPath") - } - } catch { - case ioe: IOException => - LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, ioe) - } - } - - protected def createSparkContext(): Unit = { - spark2CreateContext() - } - - private def spark2CreateContext(): Unit = { - val sparkClz = Class.forName("org.apache.spark.sql.SparkSession$") - val sparkObj = sparkClz.getField("MODULE$").get(null) - - val builderMethod = sparkClz.getMethod("builder") - val builder = builderMethod.invoke(sparkObj) - builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf) - - if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive" - || conf.get("zeppelin.spark.useHiveContext", "false").toLowerCase == "true") { - val hiveSiteExisted: Boolean = - Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null - val hiveClassesPresent = - sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean] - if (hiveSiteExisted && hiveClassesPresent) { - builder.getClass.getMethod("enableHiveSupport").invoke(builder) - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] - LOGGER.info("Created Spark session (with Hive support)"); - } else { - if (!hiveClassesPresent) { - LOGGER.warn("Hive support can not be enabled because spark is not built with hive") - } - if (!hiveSiteExisted) { - LOGGER.warn("Hive support can not be enabled because no hive-site.xml found") - } - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] - LOGGER.info("Created Spark session (without Hive support)"); - } - } else { - sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession] - LOGGER.info("Created Spark session (without Hive support)"); - } - - sc = sparkSession.getClass.getMethod("sparkContext").invoke(sparkSession) - .asInstanceOf[SparkContext] - getUserFiles().foreach(file => sc.addFile(file)) - sqlContext = sparkSession.getClass.getMethod("sqlContext").invoke(sparkSession) - .asInstanceOf[SQLContext] - sc.getClass.getMethod("uiWebUrl").invoke(sc).asInstanceOf[Option[String]] match { - case Some(url) => sparkUrl = url - case None => - } - - initAndSendSparkWebUrl() - - bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient""")) - bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) - bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, List("""@transient""")) - } - - protected def initAndSendSparkWebUrl(): Unit = { - val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); - if (!StringUtils.isBlank(webUiUrl)) { - this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId); - } else { - useYarnProxyURLIfNeeded() - } - InterpreterContext.get.getIntpEventClient.sendWebUrlInfo(this.sparkUrl) - } - - protected def createZeppelinContext(): Unit = { - - var sparkShims: SparkShims = null - if (isSparkSessionPresent()) { - sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession) - } else { - sparkShims = SparkShims.getInstance(sc.version, properties, sc) - } - - sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) - z = new SparkZeppelinContext(sc, sparkShims, - interpreterGroup.getInterpreterHookRegistry, - properties.getProperty("zeppelin.spark.maxResult", "1000").toInt) - bind("z", z.getClass.getCanonicalName, z, List("""@transient""")) - } - - private def useYarnProxyURLIfNeeded() { - if (properties.getProperty("spark.webui.yarn.useProxy", "false").toBoolean) { - if (sparkMaster.startsWith("yarn")) { - val appId = sc.applicationId - val yarnClient = YarnClient.createYarnClient - val yarnConf = new YarnConfiguration() - // disable timeline service as we only query yarn app here. - // Otherwise we may hit this kind of ERROR: - // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig - yarnConf.set("yarn.timeline-service.enabled", "false") - yarnClient.init(yarnConf) - yarnClient.start() - val appReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId)) - this.sparkUrl = appReport.getTrackingUrl - } - } - } - - private def isSparkSessionPresent(): Boolean = { - try { - Class.forName("org.apache.spark.sql.SparkSession") - true - } catch { - case _: ClassNotFoundException | _: NoClassDefFoundError => false - } - } - - protected def getField(obj: Object, name: String): Object = { - val field = obj.getClass.getField(name) - field.setAccessible(true) - field.get(obj) - } - - protected def getDeclareField(obj: Object, name: String): Object = { - val field = obj.getClass.getDeclaredField(name) - field.setAccessible(true) - field.get(obj) - } - - protected def setDeclaredField(obj: Object, name: String, value: Object): Unit = { - val field = obj.getClass.getDeclaredField(name) - field.setAccessible(true) - field.set(obj, value) - } - - protected def callMethod(obj: Object, name: String): Object = { - callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object]) - } - - protected def callMethod(obj: Object, name: String, - parameterTypes: Array[Class[_]], - parameters: Array[Object]): Object = { - val method = obj.getClass.getMethod(name, parameterTypes: _ *) - method.setAccessible(true) - method.invoke(obj, parameters: _ *) - } - - protected def getUserJars(): Seq[String] = { - var classLoader = Thread.currentThread().getContextClassLoader - var extraJars = Seq.empty[String] - while (classLoader != null) { - if (classLoader.getClass.getCanonicalName == - "org.apache.spark.util.MutableURLClassLoader") { - extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs() - // Check if the file exists. - .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } - // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. - .filterNot { - u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") - } - .map(url => url.toString).toSeq - classLoader = null - } else { - classLoader = classLoader.getParent - } - } - - extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath()) - LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) - extraJars - } - - protected def getUserFiles(): Seq[String] = { - depFiles.asScala.toSeq.filter(!_.endsWith(".jar")) - } -} - -object BaseSparkScalaInterpreter { - val sessionNum = new AtomicInteger(0) -} diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala deleted file mode 100644 index 79018c89a0..0000000000 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import org.apache.spark.SparkContext -import org.slf4j.{Logger, LoggerFactory} - -object JobProgressUtil { - - protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) - - def progress(sc: SparkContext, jobGroup : String):Int = { - // Each paragraph has one unique jobGroup, and one paragraph may run multiple times. - // So only look for the first job which match the jobGroup - val jobInfo = sc.statusTracker - .getJobIdsForGroup(jobGroup) - .headOption - .flatMap(jobId => sc.statusTracker.getJobInfo(jobId)) - val stagesInfoOption = jobInfo.flatMap( jobInfo => Some(jobInfo.stageIds().flatMap(sc.statusTracker.getStageInfo))) - stagesInfoOption match { - case None => 0 - case Some(stagesInfo) => - val taskCount = stagesInfo.map(_.numTasks).sum - val completedTaskCount = stagesInfo.map(_.numCompletedTasks).sum - LOGGER.debug("Total TaskCount: " + taskCount) - LOGGER.debug("Completed TaskCount: " + completedTaskCount) - if (taskCount == 0) { - 0 - } else { - (100 * completedTaskCount.toDouble / taskCount).toInt - } - } - } -} diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java index 709f484b20..4b4ffc8891 100644 --- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java +++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.spark; -import org.apache.zeppelin.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionUtil; import org.apache.zeppelin.interpreter.InterpreterContext; diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java index 88f0f88ee7..2b7e3a622a 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java @@ -226,7 +226,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { assertEquals(Status.ERROR, result.getStatus()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found")); + assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database")); assertEquals(0, result.getJobUrls().size()); } finally { @@ -299,7 +299,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi { assertEquals(Status.ERROR, result.getStatus()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found")); + assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database")); assertEquals(0, result.getJobUrls().size()); // cancel