This is an automated email from the ASF dual-hosted git repository.
lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new cee2c79d LIVY-889 initial commit (#344)
cee2c79d is described below
commit cee2c79db8607f87b1a92f8fde6c1aaa84c3a60c
Author: Praneet Sharma <[email protected]>
AuthorDate: Mon Dec 26 12:38:12 2022 -0800
LIVY-889 initial commit (#344)
* addJar, addFile called during a running livy session will take care of
adding this jar to spark driver classpath and appropriate interpreter.
---
.../org/apache/livy/repl/SparkInterpreter.scala | 6 +++-
.../org/apache/livy/repl/SparkInterpreter.scala | 6 +++-
.../livy/repl/AbstractSparkInterpreter.scala | 2 ++
.../org/apache/livy/repl/PythonInterpreter.scala | 3 +-
.../scala/org/apache/livy/repl/ReplDriver.scala | 35 ++++++++++++++++++----
.../java/org/apache/livy/rsc/driver/RSCDriver.java | 3 +-
6 files changed, 46 insertions(+), 9 deletions(-)
diff --git
a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
index 98c478f3..48dca17b 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
@@ -18,7 +18,7 @@
package org.apache.livy.repl
import java.io.File
-import java.net.URLClassLoader
+import java.net.{URL, URLClassLoader}
import java.nio.file.{Files, Paths}
import scala.tools.nsc.Settings
@@ -95,6 +95,10 @@ class SparkInterpreter(protected override val conf:
SparkConf) extends AbstractS
}
}
+ override def addJar(jar: String): Unit = {
+ sparkILoop.addUrlsToClassPath(new URL(jar))
+ }
+
override protected def isStarted(): Boolean = {
sparkILoop != null
}
diff --git
a/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
b/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
index bb8f7e59..c3756a50 100644
--- a/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
+++ b/repl/scala-2.12/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
@@ -18,7 +18,7 @@
package org.apache.livy.repl
import java.io.File
-import java.net.URLClassLoader
+import java.net.{URL, URLClassLoader}
import java.nio.file.{Files, Paths}
import scala.tools.nsc.Settings
@@ -95,6 +95,10 @@ class SparkInterpreter(protected override val conf:
SparkConf) extends AbstractS
}
}
+ override def addJar(jar: String): Unit = {
+ sparkILoop.addUrlsToClassPath(new URL(jar))
+ }
+
override protected def isStarted(): Boolean = {
sparkILoop != null
}
diff --git
a/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
b/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
index d2ac04f3..0decf095 100644
--- a/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/AbstractSparkInterpreter.scala
@@ -61,6 +61,8 @@ abstract class AbstractSparkInterpreter extends Interpreter
with Logging {
protected def conf: SparkConf
+ protected def addJar(jar: String): Unit
+
protected def postStart(): Unit = {
entries = new SparkEntries(conf)
diff --git a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
index bab2bfa7..36c09deb 100644
--- a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
@@ -290,13 +290,14 @@ private class PythonInterpreter(
pysparkJobProcessor.addFile(path)
}
- def addPyFile(driver: ReplDriver, conf: SparkConf, path: String): Unit = {
+ def addPyFile(driver: ReplDriver, conf: SparkConf, path: String): String = {
val localCopyDir = new File(pysparkJobProcessor.getLocalTmpDirPath)
val localCopyFile = driver.copyFileToLocal(localCopyDir, path,
SparkContext.getOrCreate(conf))
pysparkJobProcessor.addPyFile(localCopyFile.getPath)
if (path.endsWith(".jar")) {
driver.addLocalFileToClassLoader(localCopyFile)
}
+ localCopyFile.getPath
}
private def updatePythonGatewayPort(port: Int): Unit = {
diff --git a/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
b/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
index b805a4db..9ac82e82 100644
--- a/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
@@ -37,6 +37,8 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
private[repl] var session: Session = _
+ private val kind = Kind(livyConf.get(RSCConf.Entry.SESSION_KIND))
+
override protected def initializeSparkEntries(): SparkEntries = {
session = new Session(livyConf = livyConf,
sparkConf = conf,
@@ -103,17 +105,40 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)
override protected def addFile(path: String): Unit = {
if (!ClientConf.TEST_MODE) {
- session.interpreter(PySpark).foreach {
_.asInstanceOf[PythonInterpreter].addFile(path) }
+ require(kind != null)
+ session.interpreter(kind) match {
+ case Some(interpreter) => {
+ interpreter match {
+ case pi: PythonInterpreter => pi.addFile(path)
+ case _ => super.addFile(path)
+ }
+ }
+ case None => super.addFile(path)
+ }
}
super.addFile(path)
}
- override protected def addJarOrPyFile(path: String): Unit = {
+ override protected def addJarOrPyFile(path: String): String = {
if (!ClientConf.TEST_MODE) {
- session.interpreter(PySpark)
- .foreach { _.asInstanceOf[PythonInterpreter].addPyFile(this, conf,
path) }
+ require(kind != null)
+ session.interpreter(kind) match {
+ case Some(interpreter) => {
+ interpreter match {
+ case pi: PythonInterpreter => pi.addPyFile(this, conf, path)
+ case si: SparkInterpreter => {
+ val localCopy = s"file://${super.addJarOrPyFile(path)}"
+ si.addJar(localCopy)
+ localCopy
+ }
+ case _ => super.addJarOrPyFile(path)
+ }
+ }
+ case None => super.addJarOrPyFile(path)
+ }
+ } else {
+ super.addJarOrPyFile(path)
}
- super.addJarOrPyFile(path)
}
override protected def onClientAuthenticated(client: Rpc): Unit = {
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
index a8f31f79..52af6d11 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
@@ -482,11 +482,12 @@ public class RSCDriver extends BaseProtocol {
jc.sc().addFile(path);
}
- protected void addJarOrPyFile(String path) throws Exception {
+ protected String addJarOrPyFile(String path) throws Exception {
File localCopyDir = new File(jc.getLocalTmpDir(), "__livy__");
File localCopy = copyFileToLocal(localCopyDir, path, jc.sc().sc());
addLocalFileToClassLoader(localCopy);
jc.sc().addJar(path);
+ return localCopy.getPath();
}
public void addLocalFileToClassLoader(File localCopy) throws
MalformedURLException {