Repository: zeppelin Updated Branches: refs/heads/branch-0.7 c7847c119 -> 6d72db34f
ZEPPELIN-2199: Fix lapply issue in sparkR ### What is this PR for? Function createRDDFromArray used for creating R RDD expects a JavaSparkContext object instead of spark context. This PR address that concern. ### What type of PR is it? Bug Fix ### Todos * [ ] - Task ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-2199 ### How should this be tested? Build Zeppelin and Run %r families <- c("gaussian", "poisson") df <- createDataFrame(iris) train <- function(family) { model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family) summary(model) } model.summaries <- spark.lapply(families, train) print(model.summaries) It fails in current master but will pass in this branch. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? Not completely sure about this. * Does this needs documentation? No. Author: Vipul Modi <vipulmodi@Vipuls-MacBook-Air.local> Author: Vipul Modi <vip...@qubole.com> Closes #2090 from vipul1409/ZEPPELIN-2199 and squashes the following commits: 8fccad4 [Vipul Modi] Trigger build 2 f351a7a [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199 c89ed1e [Vipul Modi] Trigger build 2 509faf7 [Vipul Modi] Trigger build b83121e [Vipul Modi] Nullify jsc on close and remove file:/ changes 1d5bd5b [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199 cebf970 [Vipul Modi] Removing dummy file.txt 39e8144 [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199 8a0651d [Vipul Modi] Dummy commit 70b19c1 [Vipul Modi] ZEPPELIN-2199: Fix lapply issue in sparkR (cherry picked from commit 0e1964877654c56c72473ad07dac1de6f9646816) Signed-off-by: Felix Cheung <felixche...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/6d72db34 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/6d72db34 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/6d72db34 Branch: refs/heads/branch-0.7 Commit: 6d72db34fa67504bb7d528c799fba58aa48f4a12 Parents: c7847c1 Author: Vipul Modi <vipulmodi@Vipuls-MacBook-Air.local> Authored: Tue Mar 7 09:10:30 2017 +0530 Committer: Felix Cheung <felixche...@apache.org> Committed: Tue Mar 7 23:26:18 2017 -0800 ---------------------------------------------------------------------- .../org/apache/zeppelin/spark/SparkInterpreter.java | 12 ++++++++++++ .../org/apache/zeppelin/spark/SparkRInterpreter.java | 4 ++++ .../org/apache/zeppelin/spark/ZeppelinRContext.java | 6 ++++++ spark/src/main/resources/R/zeppelin_sparkr.R | 2 +- 4 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 1aecec4..47f8080 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -38,6 +38,7 @@ import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; import org.apache.spark.SecurityManager; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.repl.SparkILoop; import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; @@ -123,6 +124,7 @@ public class SparkInterpreter extends Interpreter { private SparkVersion sparkVersion; private static File outputDir; // class outputdir for scala 2.11 private Object classServer; // classserver for scala 2.11 + private JavaSparkContext jsc; public SparkInterpreter(Properties property) { @@ -149,6 +151,15 @@ public class SparkInterpreter extends Interpreter { } } + public JavaSparkContext getJavaSparkContext() { + synchronized (sharedInterpreterLock) { + if (jsc == null) { + jsc = JavaSparkContext.fromSparkContext(sc); + } + return jsc; + } + } + public boolean isSparkContextInitialized() { synchronized (sharedInterpreterLock) { return sc != null; @@ -1390,6 +1401,7 @@ public class SparkInterpreter extends Interpreter { } sparkSession = null; sc = null; + jsc = null; if (classServer != null) { Utils.invokeMethod(classServer, "stop"); classServer = null; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 8f3e93c..e2d01fe 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.SparkContext; import org.apache.spark.SparkRBackend; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; @@ -45,6 +46,7 @@ public class SparkRInterpreter extends Interpreter { private SparkInterpreter sparkInterpreter; private ZeppelinR zeppelinR; private SparkContext sc; + private JavaSparkContext jsc; public SparkRInterpreter(Properties property) { super(property); @@ -73,8 +75,10 @@ public class SparkRInterpreter extends Interpreter { this.sparkInterpreter = getSparkInterpreter(); this.sc = sparkInterpreter.getSparkContext(); + this.jsc = sparkInterpreter.getJavaSparkContext(); SparkVersion sparkVersion = new SparkVersion(sc.version()); ZeppelinRContext.setSparkContext(sc); + ZeppelinRContext.setJavaSparkContext(jsc); if (Utils.isSpark2()) { ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java index 935410b..a2fc412 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.spark; import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; /** @@ -28,6 +29,7 @@ public class ZeppelinRContext { private static SQLContext sqlContext; private static ZeppelinContext zeppelinContext; private static Object sparkSession; + private static JavaSparkContext javaSparkContext; public static void setSparkContext(SparkContext sparkContext) { ZeppelinRContext.sparkContext = sparkContext; @@ -60,4 +62,8 @@ public class ZeppelinRContext { public static Object getSparkSession() { return sparkSession; } + + public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; } + + public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/resources/R/zeppelin_sparkr.R ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R index e95513f..525c6c5 100644 --- a/spark/src/main/resources/R/zeppelin_sparkr.R +++ b/spark/src/main/resources/R/zeppelin_sparkr.R @@ -45,7 +45,7 @@ assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) if (version >= 20000) { assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv) assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) - assign(".sparkRjsc", get(".sc", envir = SparkR:::.sparkREnv), envir=SparkR:::.sparkREnv) + assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv) } assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)