This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 38e2b679e8 [ZEPPELIN-4627] Fix class loader inconsistency between driver and executor when (#4347) 38e2b679e8 is described below commit 38e2b679e80f0424b01365bb0d0df811a4556f2d Author: Kristin Cowalcijk <moris...@yeah.net> AuthorDate: Tue Apr 12 10:39:03 2022 +0800 [ZEPPELIN-4627] Fix class loader inconsistency between driver and executor when (#4347) running on Spark 3.x Signed-off-by: Kristin Cowalcijk <moris...@yeah.net> --- .../main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java | 9 ++------- .../java/org/apache/zeppelin/spark/SparkInterpreterTest.java | 8 +++++++- .../org/apache/zeppelin/spark/SparkScala212Interpreter.scala | 3 +-- .../org/apache/zeppelin/spark/SparkScala213Interpreter.scala | 1 + 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 824b10dc2e..cb83bec24e 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -95,10 +95,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter { String curSql = null; ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { - if (sparkInterpreter.isScala211()) { - // TODO(zjffdu) scala 2.12,2.13 still doesn't work for codegen (ZEPPELIN-4627) - Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader()); - } + Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader()); Method method = sqlContext.getClass().getMethod("sql", String.class); for (String sql : sqls) { curSql = sql; @@ -142,9 +139,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter { } } finally { sc.clearJobGroup(); - if (sparkInterpreter.isScala211()) { - Thread.currentThread().setContextClassLoader(originalClassLoader); - } + Thread.currentThread().setContextClassLoader(originalClassLoader); } return new InterpreterResult(Code.SUCCESS); diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index d3a9622f3d..045d91ab46 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -197,7 +197,13 @@ public class SparkInterpreterTest { assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl/" + interpreter.getJavaSparkContext().sc().applicationId())); - // case class + // RDD of case class objects + result = interpreter.interpret( + "case class A(a: Integer, b: Integer)\n" + + "sc.parallelize(Seq(A(10, 20), A(30, 40))).collect()", getInterpreterContext()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + + // Dataset of case class objects result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala index 80c66c2ec2..f31293239a 100644 --- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala +++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala @@ -19,7 +19,6 @@ package org.apache.zeppelin.spark import java.io.{BufferedReader, File} import java.net.URLClassLoader -import java.nio.file.{Files, Paths} import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop @@ -226,7 +225,7 @@ private object SparkScala212Interpreter { foreach (intp quietRun _) ) // classloader and power mode setup - intp.setContextClassLoader() + Thread.currentThread.setContextClassLoader(intp.classLoader) if (isReplPower) { replProps.power setValue true unleashAndSetPhase() diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala index 1753193da1..f896101dda 100644 --- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala +++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala @@ -76,6 +76,7 @@ class SparkScala213Interpreter(override val conf: SparkConf, sparkILoop = new SparkILoop(null, replOut) sparkILoop.run(settings) this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator) + Thread.currentThread.setContextClassLoader(sparkILoop.classLoader) createSparkContext()