This is an automated email from the ASF dual-hosted git repository.

lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new cee2c79d LIVY-889 initial commit (#344)
cee2c79d is described below

commit cee2c79db8607f87b1a92f8fde6c1aaa84c3a60c
Author: Praneet Sharma <[email protected]>
AuthorDate: Mon Dec 26 12:38:12 2022 -0800

    LIVY-889 initial commit (#344)
    
    * addJar, addFile called during a running livy session will take care of 
adding this jar to spark driver classpath and appropriate interpreter.
---
 .../org/apache/livy/repl/SparkInterpreter.scala    |  6 +++-
 .../org/apache/livy/repl/SparkInterpreter.scala    |  6 +++-
 .../livy/repl/AbstractSparkInterpreter.scala       |  2 ++
 .../org/apache/livy/repl/PythonInterpreter.scala   |  3 +-
 .../scala/org/apache/livy/repl/ReplDriver.scala    | 35 ++++++++++++++++++----
 .../java/org/apache/livy/rsc/driver/RSCDriver.java |  3 +-
 6 files changed, 46 insertions(+), 9 deletions(-)

diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala 
b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
index 98c478f3..48dca17b 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
@@ -18,7 +18,7 @@
 package org.apache.livy.repl
 
 import java.io.File
-import java.net.URLClassLoader
+import java.net.{URL, URLClassLoader}
 import java.nio.file.{Files, Paths}
 
 import scala.tools.nsc.Settings
@@ -95,6 +95,10 @@ class SparkInterpreter(protected override val conf: 
SparkConf) extends AbstractS
     }
   }
 
+  override def addJar(jar: String): Unit = {
+    sparkILoop.addUrlsToClassPath(new URL(jar))
+  }
+
   override protected def isStarted(): Boolean = {
     sparkILoop != null
   }
diff --git 
a/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala 
b/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
index bb8f7e59..c3756a50 100644
--- a/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
+++ b/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
@@ -18,7 +18,7 @@
 package org.apache.livy.repl
 
 import java.io.File
-import java.net.URLClassLoader
+import java.net.{URL, URLClassLoader}
 import java.nio.file.{Files, Paths}
 
 import scala.tools.nsc.Settings
@@ -95,6 +95,10 @@ class SparkInterpreter(protected override val conf: 
SparkConf) extends AbstractS
     }
   }
 
+  override def addJar(jar: String): Unit = {
+    sparkILoop.addUrlsToClassPath(new URL(jar))
+  }
+
   override protected def isStarted(): Boolean = {
     sparkILoop != null
   }
diff --git 
a/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala 
b/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
index d2ac04f3..0decf095 100644
--- a/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
@@ -61,6 +61,8 @@ abstract class AbstractSparkInterpreter extends Interpreter 
with Logging {
 
   protected def conf: SparkConf
 
+  protected def addJar(jar: String): Unit
+
   protected def postStart(): Unit = {
     entries = new SparkEntries(conf)
 
diff --git a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala 
b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
index bab2bfa7..36c09deb 100644
--- a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
@@ -290,13 +290,14 @@ private class PythonInterpreter(
     pysparkJobProcessor.addFile(path)
   }
 
-  def addPyFile(driver: ReplDriver, conf: SparkConf, path: String): Unit = {
+  def addPyFile(driver: ReplDriver, conf: SparkConf, path: String): String = {
     val localCopyDir = new File(pysparkJobProcessor.getLocalTmpDirPath)
     val localCopyFile = driver.copyFileToLocal(localCopyDir, path, 
SparkContext.getOrCreate(conf))
     pysparkJobProcessor.addPyFile(localCopyFile.getPath)
     if (path.endsWith(".jar")) {
       driver.addLocalFileToClassLoader(localCopyFile)
     }
+    localCopyFile.getPath
   }
 
   private def updatePythonGatewayPort(port: Int): Unit = {
diff --git a/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala 
b/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
index b805a4db..9ac82e82 100644
--- a/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
@@ -37,6 +37,8 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
 
   private[repl] var session: Session = _
 
+  private val kind = Kind(livyConf.get(RSCConf.Entry.SESSION_KIND))
+
   override protected def initializeSparkEntries(): SparkEntries = {
     session = new Session(livyConf = livyConf,
       sparkConf = conf,
@@ -103,17 +105,40 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
 
   override protected def addFile(path: String): Unit = {
     if (!ClientConf.TEST_MODE) {
-      session.interpreter(PySpark).foreach { 
_.asInstanceOf[PythonInterpreter].addFile(path) }
+      require(kind != null)
+      session.interpreter(kind) match {
+        case Some(interpreter) => {
+          interpreter match {
+            case pi: PythonInterpreter => pi.addFile(path)
+            case _ => super.addFile(path)
+          }
+        }
+        case None => super.addFile(path)
+      }
     }
     super.addFile(path)
   }
 
-  override protected def addJarOrPyFile(path: String): Unit = {
+  override protected def addJarOrPyFile(path: String): String = {
     if (!ClientConf.TEST_MODE) {
-      session.interpreter(PySpark)
-        .foreach { _.asInstanceOf[PythonInterpreter].addPyFile(this, conf, 
path) }
+      require(kind != null)
+      session.interpreter(kind) match {
+        case Some(interpreter) => {
+          interpreter match {
+            case pi: PythonInterpreter => pi.addPyFile(this, conf, path)
+            case si: SparkInterpreter => {
+              val localCopy = s"file://${super.addJarOrPyFile(path)}"
+              si.addJar(localCopy)
+              localCopy
+            }
+            case _ => super.addJarOrPyFile(path)
+          }
+        }
+        case None => super.addJarOrPyFile(path)
+      }
+    } else {
+      super.addJarOrPyFile(path)
     }
-    super.addJarOrPyFile(path)
   }
 
   override protected def onClientAuthenticated(client: Rpc): Unit = {
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
index a8f31f79..52af6d11 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
@@ -482,11 +482,12 @@ public class RSCDriver extends BaseProtocol {
     jc.sc().addFile(path);
   }
 
-  protected void addJarOrPyFile(String path) throws Exception {
+  protected String addJarOrPyFile(String path) throws Exception {
     File localCopyDir = new File(jc.getLocalTmpDir(), "__livy__");
     File localCopy = copyFileToLocal(localCopyDir, path, jc.sc().sc());
     addLocalFileToClassLoader(localCopy);
     jc.sc().addJar(path);
+    return localCopy.getPath();
   }
 
   public void addLocalFileToClassLoader(File localCopy) throws 
MalformedURLException {

Reply via email to