This is an automated email from the ASF dual-hosted git repository.

ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 80c6303b [LIVY-967][SERVER] Return session information with livy 
sessions APIs (#383)
80c6303b is described below

commit 80c6303b5bc7ac5d07281b0366dc2c4a2d7f082a
Author: Asif Khatri <[email protected]>
AuthorDate: Tue Apr 18 17:45:55 2023 +0530

    [LIVY-967][SERVER] Return session information with livy sessions APIs (#383)
    
    ## What changes were proposed in this pull request?
    
    Currently livy GET /Sessions doesn't return fields like driver-executor 
memory, spark configuration, etc. Ideally a session response should return all 
the values set in session request api call(POST /sessions).
    
    JIRA: https://issues.apache.org/jira/browse/LIVY-967
    
    ## How was this patch tested?
    
    Verified manually by creating interactive session via REST API call in a 
local Yarn cluster. Also, we have updated the unit tests.
---
 .../apache/livy/client/common/HttpMessages.java    | 37 +++++++++++++--
 .../apache/livy/client/http/HttpClientSpec.scala   | 12 +++++
 docs/rest-api.md                                   | 55 ++++++++++++++++++++++
 .../server/interactive/InteractiveSession.scala    | 52 +++++++++++++++++++-
 .../interactive/InteractiveSessionServlet.scala    | 12 +++--
 .../InteractiveSessionServletSpec.scala            | 22 +++++++++
 .../interactive/InteractiveSessionSpec.scala       | 11 +++--
 7 files changed, 189 insertions(+), 12 deletions(-)

diff --git 
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java 
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
index a88ed8ca..e0621a39 100644
--- 
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
+++ 
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -62,9 +62,25 @@ public class HttpMessages {
     public final Map<String, String> appInfo;
     public final List<String> log;
     public final String ttl;
-
-    public SessionInfo(int id, String name, String appId, String owner, String 
proxyUser,
-        String state, String kind, Map<String, String> appInfo, List<String> 
log, String ttl) {
+    public final String driverMemory;
+    public final int driverCores;
+    public final String executorMemory;
+    public final int executorCores;
+    public final Map<String, String> conf;
+    public final List<String> archives;
+    public final List<String> files;
+    public final int heartbeatTimeoutInSecond;
+    public final List<String> jars;
+    public final int numExecutors;
+    public final List<String> pyFiles;
+    public final String queue;
+
+    public SessionInfo(int id, String name, String appId, String owner, String 
state,
+        String kind, Map<String, String> appInfo, List<String> log,
+        String ttl, String driverMemory,
+        int driverCores, String executorMemory,  int executorCores, 
Map<String, String> conf,
+        List<String> archives, List<String> files, int 
heartbeatTimeoutInSecond, List<String> jars,
+        int numExecutors, String proxyUser, List<String> pyFiles, String 
queue) {
       this.id = id;
       this.name = name;
       this.appId = appId;
@@ -75,10 +91,23 @@ public class HttpMessages {
       this.appInfo = appInfo;
       this.log = log;
       this.ttl = ttl;
+      this.driverMemory = driverMemory;
+      this.driverCores = driverCores;
+      this.executorMemory = executorMemory;
+      this.executorCores = executorCores;
+      this.conf = conf;
+      this.archives = archives;
+      this.files = files;
+      this.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond;
+      this.jars = jars;
+      this.numExecutors = numExecutors;
+      this.pyFiles = pyFiles;
+      this.queue = queue;
     }
 
     private SessionInfo() {
-      this(-1, null, null, null, null, null, null, null, null, null);
+      this(-1, null, null, null, null, null, null, null, null, null, 0, null, 
0, null, null,
+              null, 0, null, 0, null, null, null);
     }
 
   }
diff --git 
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala 
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
index 336ff8c8..d24ec92a 100644
--- 
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
+++ 
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -279,6 +279,18 @@ private class HttpClientTestBootstrap extends LifeCycle {
         when(session.state).thenReturn(SessionState.Idle)
         when(session.proxyUser).thenReturn(None)
         when(session.kind).thenReturn(Spark)
+        when(session.driverMemory).thenReturn(None)
+        when(session.driverCores).thenReturn(None)
+        when(session.executorMemory).thenReturn(None)
+        when(session.executorCores).thenReturn(None)
+        when(session.numExecutors).thenReturn(None)
+        when(session.proxyUser).thenReturn(None)
+        when(session.queue).thenReturn(None)
+        when(session.conf).thenReturn(Map("" -> ""))
+        when(session.archives).thenReturn(List())
+        when(session.files).thenReturn(List())
+        when(session.jars).thenReturn(List())
+        when(session.pyFiles).thenReturn(List())
         when(session.stop()).thenReturn(Future.successful(()))
         when(session.ttl).thenReturn(None)
         require(HttpClientSpec.session == null, "Session already created?")
diff --git a/docs/rest-api.md b/docs/rest-api.md
index 903bb5b2..ec43e0f8 100644
--- a/docs/rest-api.md
+++ b/docs/rest-api.md
@@ -636,6 +636,61 @@ A session represents an interactive shell.
     <td>The detailed application info</td>
     <td>Map of key=val</td>
   </tr>
+  <tr>
+    <td>jars</td>
+    <td>jars to be used in this session</td>
+    <td>list of strings</td>
+  </tr>
+  <tr>
+    <td>pyFiles</td>
+    <td>Python files to be used in this session</td>
+    <td>list of strings</td>
+  </tr>
+  <tr>
+    <td>files</td>
+    <td>files to be used in this session</td>
+    <td>list of strings</td>
+  </tr>
+  <tr>
+    <td>driverMemory</td>
+    <td>Amount of memory to use for the driver process</td>
+    <td>string</td>
+  </tr>
+  <tr>
+    <td>driverCores</td>
+    <td>Number of cores to use for the driver process</td>
+    <td>int</td>
+  </tr>
+  <tr>
+    <td>executorMemory</td>
+    <td>Amount of memory to use per executor process</td>
+    <td>string</td>
+  </tr>
+  <tr>
+    <td>executorCores</td>
+    <td>Number of cores to use for each executor</td>
+    <td>int</td>
+  </tr>
+  <tr>
+    <td>numExecutors</td>
+    <td>Number of executors to launch for this session</td>
+    <td>int</td>
+  </tr>
+  <tr>
+    <td>archives</td>
+    <td>Archives to be used in this session</td>
+    <td>List of string</td>
+  </tr>
+  <tr>
+    <td>queue</td>
+    <td>The name of the YARN queue to which submitted</td>
+    <td>string</td>
+  </tr>
+  <tr>
+    <td>conf</td>
+    <td>Spark configuration properties</td>
+    <td>Map of key=val</td>
+  </tr>
 </table>
 
 
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 6a1dba0f..59a593f6 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -55,6 +55,18 @@ case class InteractiveRecoveryMetadata(
     heartbeatTimeoutS: Int,
     owner: String,
     ttl: Option[String],
+    driverMemory: Option[String],
+    driverCores: Option[Int],
+    executorMemory: Option[String],
+    executorCores: Option[Int],
+    conf: Map[String, String],
+    archives: List[String],
+    files: List[String],
+    jars: List[String],
+    numExecutors: Option[Int],
+    pyFiles: List[String],
+    queue: Option[String],
+    // proxyUser is deprecated. It is available here only for backward 
compatibility
     proxyUser: Option[String],
     rscDriverUri: Option[URI],
     version: Int = 1)
@@ -127,6 +139,17 @@ object InteractiveSession extends Logging {
       impersonatedUser,
       ttl,
       sessionStore,
+      request.driverMemory,
+      request.driverCores,
+      request.executorMemory,
+      request.executorCores,
+      request.conf,
+      request.archives,
+      request.files,
+      request.jars,
+      request.numExecutors,
+      request.pyFiles,
+      request.queue,
       mockApp)
   }
 
@@ -155,6 +178,17 @@ object InteractiveSession extends Logging {
       metadata.proxyUser,
       metadata.ttl,
       sessionStore,
+      metadata.driverMemory,
+      metadata.driverCores,
+      metadata.executorMemory,
+      metadata.executorCores,
+      metadata.conf,
+      metadata.archives,
+      metadata.files,
+      metadata.jars,
+      metadata.numExecutors,
+      metadata.pyFiles,
+      metadata.queue,
       mockApp)
   }
 
@@ -377,12 +411,23 @@ class InteractiveSession(
     val client: Option[RSCClient],
     initialState: SessionState,
     val kind: Kind,
-    heartbeatTimeoutS: Int,
+    val heartbeatTimeoutS: Int,
     livyConf: LivyConf,
     owner: String,
     override val proxyUser: Option[String],
     ttl: Option[String],
     sessionStore: SessionStore,
+    val driverMemory: Option[String],
+    val driverCores: Option[Int],
+    val executorMemory: Option[String],
+    val executorCores: Option[Int],
+    val conf: Map[String, String],
+    val archives: List[String],
+    val files: List[String],
+    val jars: List[String],
+    val numExecutors: Option[Int],
+    val pyFiles: List[String],
+    val queue: Option[String],
     mockApp: Option[SparkApp]) // For unit test.
   extends Session(id, name, owner, ttl, livyConf)
   with SessionHeartbeat
@@ -475,7 +520,10 @@ class InteractiveSession(
 
   override def recoveryMetadata: RecoveryMetadata =
     InteractiveRecoveryMetadata(id, name, appId, appTag, kind,
-      heartbeatTimeout.toSeconds.toInt, owner, None, proxyUser, rscDriverUri)
+      heartbeatTimeout.toSeconds.toInt, owner, None,
+      driverMemory, driverCores, executorMemory, executorCores, conf,
+      archives, files, jars, numExecutors, pyFiles, queue,
+      proxyUser, rscDriverUri)
 
   override def state: SessionState = {
     if (serverSideState == SessionState.Running) {
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index 85407b04..239936f3 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -92,9 +92,15 @@ class InteractiveSessionServlet(
         Nil
       }
 
-    new SessionInfo(session.id, session.name.orNull, session.appId.orNull, 
session.owner,
-      session.proxyUser.orNull, session.state.toString, session.kind.toString,
-      session.appInfo.asJavaMap, logs.asJava, session.ttl.orNull)
+    new SessionInfo(session.id, session.name.orNull, session.appId.orNull,
+      session.owner, session.state.toString, session.kind.toString,
+      session.appInfo.asJavaMap, logs.asJava,
+      session.proxyUser.orNull, session.driverMemory.orNull,
+      session.driverCores.getOrElse(0), session.executorMemory.orNull,
+      session.executorCores.getOrElse(0), session.conf.asJava, 
session.archives.asJava,
+      session.files.asJava, session.heartbeatTimeoutS, session.jars.asJava,
+      session.numExecutors.getOrElse(0), session.proxyUser.orNull, 
session.pyFiles.asJava,
+      session.queue.orNull)
   }
 
   post("/:id/stop") {
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index 0f1cdc7f..f7c7ad3a 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -70,6 +70,17 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
       when(session.proxyUser).thenReturn(None)
       when(session.heartbeatExpired).thenReturn(false)
       when(session.ttl).thenReturn(None)
+      when(session.driverMemory).thenReturn(None)
+      when(session.driverCores).thenReturn(None)
+      when(session.executorMemory).thenReturn(None)
+      when(session.executorCores).thenReturn(None)
+      when(session.numExecutors).thenReturn(None)
+      when(session.queue).thenReturn(None)
+      when(session.conf).thenReturn(Map.empty[String, String])
+      when(session.archives).thenReturn(List())
+      when(session.files).thenReturn(List())
+      when(session.jars).thenReturn(List())
+      when(session.pyFiles).thenReturn(List())
       when(session.statements).thenAnswer(
         new Answer[IndexedSeq[Statement]]() {
           override def answer(args: InvocationOnMock): IndexedSeq[Statement] = 
statements
@@ -184,6 +195,17 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
     when(session.logLines()).thenReturn(log)
     when(session.heartbeatExpired).thenReturn(false)
     when(session.ttl).thenReturn(None)
+    when(session.driverMemory).thenReturn(None)
+    when(session.driverCores).thenReturn(None)
+    when(session.executorMemory).thenReturn(None)
+    when(session.executorCores).thenReturn(None)
+    when(session.numExecutors).thenReturn(None)
+    when(session.queue).thenReturn(None)
+    when(session.conf).thenReturn(Map.empty[String, String])
+    when(session.archives).thenReturn(List())
+    when(session.files).thenReturn(List())
+    when(session.jars).thenReturn(List())
+    when(session.pyFiles).thenReturn(List())
 
     val req = mock[HttpServletRequest]
 
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index 02aca27a..af66ba95 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -278,7 +278,8 @@ class InteractiveSessionSpec extends FunSpec
       
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
       val m = InteractiveRecoveryMetadata(
           78, Some("Test session"), None, "appTag", Spark, 0, null, None, None,
-          Some(URI.create("")))
+          None, None, None, Map.empty[String, String], List.empty[String], 
List.empty[String],
+          List.empty[String], None, List.empty[String], None, None, 
Some(URI.create("")))
       val s = InteractiveSession.recover(m, conf, sessionStore, None, 
Some(mockClient))
       s.start()
 
@@ -295,7 +296,9 @@ class InteractiveSessionSpec extends FunSpec
       val mockClient = mock[RSCClient]
       
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
       val m = InteractiveRecoveryMetadata(
-          78, None, None, "appTag", Spark, 0, null, None, None, 
Some(URI.create("")))
+          78, None, None, "appTag", Spark, 0, null, None, None,
+          None, None, None, Map.empty[String, String], List.empty[String], 
List.empty[String],
+          List.empty[String], None, List.empty[String], None, None, 
Some(URI.create("")))
       val s = InteractiveSession.recover(m, conf, sessionStore, None, 
Some(mockClient))
       s.start()
 
@@ -310,7 +313,9 @@ class InteractiveSessionSpec extends FunSpec
       val conf = new LivyConf()
       val sessionStore = mock[SessionStore]
       val m = InteractiveRecoveryMetadata(
-        78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None)
+        78, None, Some("appId"), "appTag", Spark, 0, null, None, None,
+          None, None, None, Map.empty[String, String], List.empty[String], 
List.empty[String],
+          List.empty[String], None, List.empty[String], None, None, None)
       val s = InteractiveSession.recover(m, conf, sessionStore, None)
       s.start()
       s.state shouldBe a[SessionState.Dead]

Reply via email to