This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 38e2b679e8 [ZEPPELIN-4627] Fix class loader inconsistency between 
driver and executor when (#4347)
38e2b679e8 is described below

commit 38e2b679e80f0424b01365bb0d0df811a4556f2d
Author: Kristin Cowalcijk <moris...@yeah.net>
AuthorDate: Tue Apr 12 10:39:03 2022 +0800

    [ZEPPELIN-4627] Fix class loader inconsistency between driver and executor 
when (#4347)
    
    running on Spark 3.x
    
    Signed-off-by: Kristin Cowalcijk <moris...@yeah.net>
---
 .../main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java | 9 ++-------
 .../java/org/apache/zeppelin/spark/SparkInterpreterTest.java     | 8 +++++++-
 .../org/apache/zeppelin/spark/SparkScala212Interpreter.scala     | 3 +--
 .../org/apache/zeppelin/spark/SparkScala213Interpreter.scala     | 1 +
 4 files changed, 11 insertions(+), 10 deletions(-)

diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 824b10dc2e..cb83bec24e 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -95,10 +95,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter 
{
     String curSql = null;
     ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
     try {
-      if (sparkInterpreter.isScala211()) {
-        // TODO(zjffdu) scala 2.12,2.13 still doesn't work for codegen 
(ZEPPELIN-4627)
-        
Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader());
-      }
+      
Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader());
       Method method = sqlContext.getClass().getMethod("sql", String.class);
       for (String sql : sqls) {
         curSql = sql;
@@ -142,9 +139,7 @@ public class SparkSqlInterpreter extends 
AbstractInterpreter {
       }
     } finally {
       sc.clearJobGroup();
-      if (sparkInterpreter.isScala211()) {
-        Thread.currentThread().setContextClassLoader(originalClassLoader);
-      }
+      Thread.currentThread().setContextClassLoader(originalClassLoader);
     }
 
     return new InterpreterResult(Code.SUCCESS);
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index d3a9622f3d..045d91ab46 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -197,7 +197,13 @@ public class SparkInterpreterTest {
     assertTrue(((String) 
onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl/"
             + interpreter.getJavaSparkContext().sc().applicationId()));
 
-    // case class
+    // RDD of case class objects
+    result = interpreter.interpret(
+        "case class A(a: Integer, b: Integer)\n" +
+        "sc.parallelize(Seq(A(10, 20), A(30, 40))).collect()", 
getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // Dataset of case class objects
     result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", 
getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
diff --git 
a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
 
b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index 80c66c2ec2..f31293239a 100644
--- 
a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ 
b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -19,7 +19,6 @@ package org.apache.zeppelin.spark
 
 import java.io.{BufferedReader, File}
 import java.net.URLClassLoader
-import java.nio.file.{Files, Paths}
 import java.util.Properties
 import org.apache.spark.SparkConf
 import org.apache.spark.repl.SparkILoop
@@ -226,7 +225,7 @@ private object SparkScala212Interpreter {
         foreach (intp quietRun _)
         )
       // classloader and power mode setup
-      intp.setContextClassLoader()
+      Thread.currentThread.setContextClassLoader(intp.classLoader)
       if (isReplPower) {
         replProps.power setValue true
         unleashAndSetPhase()
diff --git 
a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
 
b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
index 1753193da1..f896101dda 100644
--- 
a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
+++ 
b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
@@ -76,6 +76,7 @@ class SparkScala213Interpreter(override val conf: SparkConf,
     sparkILoop = new SparkILoop(null, replOut)
     sparkILoop.run(settings)
     this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator)
+    Thread.currentThread.setContextClassLoader(sparkILoop.classLoader)
 
     createSparkContext()
 

Reply via email to