This is an automated email from the ASF dual-hosted git repository.
ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 86fc8238 [LIVY-989] Livy core support for interactive session
idleTimeout (#426)
86fc8238 is described below
commit 86fc823893ee96d4effaa4f2b8ef6603cea9d77a
Author: Asif Khatri <[email protected]>
AuthorDate: Mon Oct 30 19:28:53 2023 +0530
[LIVY-989] Livy core support for interactive session idleTimeout (#426)
## What changes were proposed in this pull request?
Currently, a Livy interactive session has a field called ttl, which kills
the session if it has been idle for a given amount of time. However, here is
the expected behavior:
* ttl: kills the session if it has been active for a certain duration,
irrespective of idleness.
* idleTimeout: kills the session if it has been idle for the given
duration. (The current TTL behaves in this manner.)
JIRA: https://issues.apache.org/jira/browse/LIVY-989
## How was this patch tested?
Tested manually by creating interactive session with idle time and TTL.
---
.../apache/livy/client/common/HttpMessages.java | 6 ++++--
.../apache/livy/client/http/HttpClientSpec.scala | 1 +
docs/rest-api.md | 15 ++++++++++++++
scalastyle.xml | 2 +-
.../interactive/CreateInteractiveRequest.scala | 4 +++-
.../server/interactive/InteractiveSession.scala | 11 +++++++++--
.../interactive/InteractiveSessionServlet.scala | 12 +++++++----
.../scala/org/apache/livy/sessions/Session.scala | 5 ++++-
.../org/apache/livy/sessions/SessionManager.scala | 23 ++++++++++++++++++----
.../apache/livy/server/SessionServletSpec.scala | 7 ++++---
.../InteractiveSessionServletSpec.scala | 2 ++
.../interactive/InteractiveSessionSpec.scala | 8 ++++----
.../org/apache/livy/sessions/MockSession.scala | 8 +++++---
.../apache/livy/sessions/SessionManagerSpec.scala | 13 +++++++++++-
.../thriftserver/LivyThriftSessionManager.scala | 1 +
15 files changed, 92 insertions(+), 26 deletions(-)
diff --git
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
index e0621a39..7edb81b7 100644
---
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
+++
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -62,6 +62,7 @@ public class HttpMessages {
public final Map<String, String> appInfo;
public final List<String> log;
public final String ttl;
+ public final String idleTimeout;
public final String driverMemory;
public final int driverCores;
public final String executorMemory;
@@ -77,7 +78,7 @@ public class HttpMessages {
public SessionInfo(int id, String name, String appId, String owner, String
state,
String kind, Map<String, String> appInfo, List<String> log,
- String ttl, String driverMemory,
+ String ttl, String idleTimeout, String driverMemory,
int driverCores, String executorMemory, int executorCores,
Map<String, String> conf,
List<String> archives, List<String> files, int
heartbeatTimeoutInSecond, List<String> jars,
int numExecutors, String proxyUser, List<String> pyFiles, String
queue) {
@@ -91,6 +92,7 @@ public class HttpMessages {
this.appInfo = appInfo;
this.log = log;
this.ttl = ttl;
+ this.idleTimeout = idleTimeout;
this.driverMemory = driverMemory;
this.driverCores = driverCores;
this.executorMemory = executorMemory;
@@ -106,7 +108,7 @@ public class HttpMessages {
}
private SessionInfo() {
- this(-1, null, null, null, null, null, null, null, null, null, 0, null,
0, null, null,
+ this(-1, null, null, null, null, null, null, null, null, null, null, 0,
null, 0, null, null,
null, 0, null, 0, null, null, null);
}
diff --git
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
index d24ec92a..b6adc235 100644
---
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
+++
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -293,6 +293,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
when(session.pyFiles).thenReturn(List())
when(session.stop()).thenReturn(Future.successful(()))
when(session.ttl).thenReturn(None)
+ when(session.idleTimeout).thenReturn(None)
require(HttpClientSpec.session == null, "Session already created?")
HttpClientSpec.session = session
session
diff --git a/docs/rest-api.md b/docs/rest-api.md
index ec43e0f8..a1095433 100644
--- a/docs/rest-api.md
+++ b/docs/rest-api.md
@@ -153,6 +153,11 @@ Creates a new interactive Scala, Python, or R shell in the
cluster.
</tr>
<tr>
<td>ttl</td>
+ <td>The timeout for this active session, example: 10m (10 minutes)</td>
+ <td>string</td>
+ </tr>
+ <tr>
+ <td>idleTimeout</td>
<td>The timeout for this inactive session, example: 10m (10 minutes)</td>
<td>string</td>
</tr>
@@ -636,6 +641,16 @@ A session represents an interactive shell.
<td>The detailed application info</td>
<td>Map of key=val</td>
</tr>
+ <tr>
+ <td>ttl</td>
+ <td>The timeout for this active session, example: 10m (10 minutes)</td>
+ <td>string</td>
+ </tr>
+ <tr>
+ <td>idleTimeout</td>
+ <td>The timeout for this inactive session, example: 10m (10 minutes)</td>
+ <td>string</td>
+ </tr>
<tr>
<td>jars</td>
<td>jars to be used in this session</td>
diff --git a/scalastyle.xml b/scalastyle.xml
index 3abfe183..a08f13c2 100644
--- a/scalastyle.xml
+++ b/scalastyle.xml
@@ -57,7 +57,7 @@
<check level="error"
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
- <parameter name="maxParameters"><![CDATA[11]]></parameter>
+ <parameter name="maxParameters"><![CDATA[12]]></parameter>
</parameters>
</check>
diff --git
a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
index c685e29c..af199684 100644
---
a/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
+++
b/server/src/main/scala/org/apache/livy/server/interactive/CreateInteractiveRequest.scala
@@ -36,6 +36,7 @@ class CreateInteractiveRequest {
var conf: Map[String, String] = Map()
var heartbeatTimeoutInSecond: Int = 0
var ttl: Option[String] = None
+ var idleTimeout: Option[String] = None
override def toString: String = {
s"[kind: $kind, proxyUser: $proxyUser, " +
@@ -52,6 +53,7 @@ class CreateInteractiveRequest {
(if (name.isDefined) s"name: ${name.get}, " else "") +
(if (conf.nonEmpty) s"conf: ${conf.mkString(",")}, " else "") +
s"heartbeatTimeoutInSecond: $heartbeatTimeoutInSecond, " +
- (if (ttl.isDefined) s"ttl: ${ttl.get}]" else "]")
+ (if (ttl.isDefined) s"ttl: ${ttl.get}, " else "") +
+ (if (idleTimeout.isDefined) s"idleTimeout: ${idleTimeout.get}]" else "]")
}
}
diff --git
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index bf2db0c0..34499d34 100644
---
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -55,6 +55,7 @@ case class InteractiveRecoveryMetadata(
heartbeatTimeoutS: Int,
owner: String,
ttl: Option[String],
+ idleTimeout: Option[String],
driverMemory: Option[String],
driverCores: Option[Int],
executorMemory: Option[String],
@@ -87,6 +88,7 @@ object InteractiveSession extends Logging {
request: CreateInteractiveRequest,
sessionStore: SessionStore,
ttl: Option[String],
+ idleTimeout: Option[String],
mockApp: Option[SparkApp] = None,
mockClient: Option[RSCClient] = None): InteractiveSession = {
val appTag =
s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
@@ -138,6 +140,7 @@ object InteractiveSession extends Logging {
owner,
impersonatedUser,
ttl,
+ idleTimeout,
sessionStore,
request.driverMemory,
request.driverCores,
@@ -177,6 +180,7 @@ object InteractiveSession extends Logging {
metadata.owner,
metadata.proxyUser,
metadata.ttl,
+ metadata.idleTimeout,
sessionStore,
metadata.driverMemory,
metadata.driverCores,
@@ -416,6 +420,7 @@ class InteractiveSession(
owner: String,
override val proxyUser: Option[String],
ttl: Option[String],
+ idleTimeout: Option[String],
sessionStore: SessionStore,
val driverMemory: Option[String],
val driverCores: Option[Int],
@@ -429,7 +434,7 @@ class InteractiveSession(
val pyFiles: List[String],
val queue: Option[String],
mockApp: Option[SparkApp]) // For unit test.
- extends Session(id, name, owner, ttl, livyConf)
+ extends Session(id, name, owner, ttl, idleTimeout, livyConf)
with SessionHeartbeat
with SparkAppListener {
@@ -514,13 +519,15 @@ class InteractiveSession(
}
})
}
+ startedOn = Some(System.nanoTime())
+ info(s"Started $this")
}
override def logLines(): IndexedSeq[String] =
app.map(_.log()).getOrElse(sessionLog)
override def recoveryMetadata: RecoveryMetadata =
InteractiveRecoveryMetadata(id, name, appId, appTag, kind,
- heartbeatTimeout.toSeconds.toInt, owner, None,
+ heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout,
driverMemory, driverCores, executorMemory, executorCores, conf,
archives, files, jars, numExecutors, pyFiles, queue,
proxyUser, rscDriverUri)
diff --git
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index a30f2a41..310504e7 100644
---
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -55,10 +55,13 @@ class InteractiveSessionServlet(
val createRequest = bodyAs[CreateInteractiveRequest](req)
val sessionId = sessionManager.nextId();
- // Calling getTimeAsMs just to validate the ttl value
+ // Calling getTimeAsMs just to validate the ttl and idleTimeout values
if (createRequest.ttl.isDefined) {
ClientConf.getTimeAsMs(createRequest.ttl.get);
}
+ if (createRequest.idleTimeout.isDefined) {
+ ClientConf.getTimeAsMs(createRequest.idleTimeout.get);
+ }
InteractiveSession.create(
sessionId,
@@ -69,7 +72,8 @@ class InteractiveSessionServlet(
accessManager,
createRequest,
sessionStore,
- createRequest.ttl)
+ createRequest.ttl,
+ createRequest.idleTimeout)
}
override protected[interactive] def clientSessionView(
@@ -114,8 +118,8 @@ class InteractiveSessionServlet(
new SessionInfo(session.id, session.name.orNull, session.appId.orNull,
session.owner, session.state.toString, session.kind.toString,
- session.appInfo.asJavaMap, logs.asJava,
- session.proxyUser.orNull, session.driverMemory.orNull,
+ session.appInfo.asJavaMap, logs.asJava, session.ttl.orNull,
+ session.idleTimeout.orNull, session.driverMemory.orNull,
session.driverCores.getOrElse(0), session.executorMemory.orNull,
session.executorCores.getOrElse(0), conf, archives,
files, session.heartbeatTimeoutS, jars,
diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala
b/server/src/main/scala/org/apache/livy/sessions/Session.scala
index 6b01c4f1..b82c2504 100644
--- a/server/src/main/scala/org/apache/livy/sessions/Session.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala
@@ -146,6 +146,7 @@ abstract class Session(
val name: Option[String],
val owner: String,
val ttl: Option[String],
+ val idleTimeout: Option[String],
val livyConf: LivyConf)
extends Logging {
@@ -153,7 +154,7 @@ abstract class Session(
name: Option[String],
owner: String,
livyConf: LivyConf) {
- this(id, name, owner, None, livyConf)
+ this(id, name, owner, None, None, livyConf)
}
import Session._
@@ -171,6 +172,8 @@ abstract class Session(
private var _lastActivity = System.nanoTime()
+ var startedOn : Option[Long] = None
+
// Directory where the session's staging files are created. The directory is
only accessible
// to the session's effective user.
private var stagingDir: Path = null
diff --git
a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index 25bf593d..16425964 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -171,17 +171,32 @@ class SessionManager[S <: Session, R <: RecoveryMetadata
: ClassTag](
} else {
val currentTime = System.nanoTime()
var calculatedTimeout = sessionTimeout;
- if (session.ttl.isDefined) {
- calculatedTimeout = ClientConf.getTimeAsMs(session.ttl.get)
+ if (session.idleTimeout.isDefined) {
+ calculatedTimeout =
ClientConf.getTimeAsMs(session.idleTimeout.get)
}
calculatedTimeout =
TimeUnit.MILLISECONDS.toNanos(calculatedTimeout)
- currentTime - session.lastActivity > calculatedTimeout
+ if (currentTime - session.lastActivity > calculatedTimeout) {
+ return true
+ }
+ if (session.ttl.isDefined && session.startedOn.isDefined) {
+ calculatedTimeout = TimeUnit.MILLISECONDS.toNanos(
+ ClientConf.getTimeAsMs(session.ttl.get))
+ if (currentTime - session.startedOn.get > calculatedTimeout) {
+ return true
+ }
+ }
+ false
}
}
}
Future.sequence(all().filter(expired).map { s =>
- info(s"Deleting $s because it was inactive for more than
${sessionTimeout / 1e6} ms.")
+ s.state match {
+ case st: FinishedSessionState =>
+ info(s"Deleting $s because it finished before
${sessionStateRetainedInSec / 1e9} secs.")
+ case _ =>
+ info(s"Deleting $s because it was inactive or the time to leave the
period is over.")
+ }
delete(s)
})
}
diff --git
a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
index 466c4a26..bebb3710 100644
--- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
@@ -31,8 +31,9 @@ object SessionServletSpec {
val PROXY_USER = "proxyUser"
- class MockSession(id: Int, owner: String, val proxyUser: Option[String],
livyConf: LivyConf)
- extends Session(id, None, owner, livyConf) {
+ class MockSession(id: Int, owner: String, ttl: Option[String], idleTimeout:
Option[String],
+ val proxyUser: Option[String], livyConf: LivyConf)
+ extends Session(id, None, owner, ttl, idleTimeout, livyConf) {
case class MockRecoveryMetadata(id: Int) extends RecoveryMetadata()
@@ -64,7 +65,7 @@ object SessionServletSpec {
val owner = remoteUser(req)
val impersonatedUser = accessManager.checkImpersonation(
proxyUser(req, params.get(PROXY_USER)), owner)
- new MockSession(sessionManager.nextId(), owner, impersonatedUser, conf)
+ new MockSession(sessionManager.nextId(), owner, None, None,
impersonatedUser, conf)
}
override protected def clientSessionView(
diff --git
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index f7c7ad3a..50360b05 100644
---
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -70,6 +70,7 @@ class InteractiveSessionServletSpec extends
BaseInteractiveServletSpec {
when(session.proxyUser).thenReturn(None)
when(session.heartbeatExpired).thenReturn(false)
when(session.ttl).thenReturn(None)
+ when(session.idleTimeout).thenReturn(None)
when(session.driverMemory).thenReturn(None)
when(session.driverCores).thenReturn(None)
when(session.executorMemory).thenReturn(None)
@@ -195,6 +196,7 @@ class InteractiveSessionServletSpec extends
BaseInteractiveServletSpec {
when(session.logLines()).thenReturn(log)
when(session.heartbeatExpired).thenReturn(false)
when(session.ttl).thenReturn(None)
+ when(session.idleTimeout).thenReturn(None)
when(session.driverMemory).thenReturn(None)
when(session.driverCores).thenReturn(None)
when(session.executorMemory).thenReturn(None)
diff --git
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index af66ba95..e7d651f8 100644
---
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -71,7 +71,7 @@ class InteractiveSessionSpec extends FunSpec
RSCConf.Entry.LIVY_JARS.key() -> ""
)
InteractiveSession.create(0, None, null, None, livyConf, accessManager,
req,
- sessionStore, None, mockApp)
+ sessionStore, None, None, mockApp)
}
private def executeStatement(code: String, codeType: Option[String] = None):
JValue = {
@@ -277,7 +277,7 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m = InteractiveRecoveryMetadata(
- 78, Some("Test session"), None, "appTag", Spark, 0, null, None, None,
+ 78, Some("Test session"), None, "appTag", Spark, 0, null, None,
None, None,
None, None, None, Map.empty[String, String], List.empty[String],
List.empty[String],
List.empty[String], None, List.empty[String], None, None,
Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None,
Some(mockClient))
@@ -296,7 +296,7 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m = InteractiveRecoveryMetadata(
- 78, None, None, "appTag", Spark, 0, null, None, None,
+ 78, None, None, "appTag", Spark, 0, null, None, None, None,
None, None, None, Map.empty[String, String], List.empty[String],
List.empty[String],
List.empty[String], None, List.empty[String], None, None,
Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None,
Some(mockClient))
@@ -313,7 +313,7 @@ class InteractiveSessionSpec extends FunSpec
val conf = new LivyConf()
val sessionStore = mock[SessionStore]
val m = InteractiveRecoveryMetadata(
- 78, None, Some("appId"), "appTag", Spark, 0, null, None, None,
+ 78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None,
None, None, None, Map.empty[String, String], List.empty[String],
List.empty[String],
List.empty[String], None, List.empty[String], None, None, None)
val s = InteractiveSession.recover(m, conf, sessionStore, None)
diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
index e28d566b..b02332bf 100644
--- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
@@ -20,13 +20,15 @@ package org.apache.livy.sessions
import org.apache.livy.LivyConf
class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String]
= None,
- ttl: Option[String] = None)
- extends Session(id, name, owner, ttl, conf) {
+ ttl: Option[String] = None, idleTimeout: Option[String] =
None)
+ extends Session(id, name, owner, ttl, idleTimeout, conf) {
case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()
override val proxyUser = None
- override def start(): Unit = ()
+ override def start(): Unit = {
+ startedOn = Some(System.nanoTime())
+ }
var stopped = false
override protected def stopSession(): Unit = {
diff --git
a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
index c32eebaf..363b01f8 100644
--- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -62,7 +62,18 @@ class SessionManagerSpec extends FunSpec with Matchers with
LivyBaseUnitTestSuit
it("should garbage collect old sessions with ttl") {
val (livyConf, manager) = createSessionManager()
val session = manager.register(new MockSession(manager.nextId(), null,
livyConf,
- None, Some("4s")))
+ None, Some("4s"), None))
+ manager.get(session.id).isDefined should be(true)
+ eventually(timeout(5 seconds), interval(100 millis)) {
+ Await.result(manager.collectGarbage(), Duration.Inf)
+ manager.get(session.id) should be(None)
+ }
+ }
+
+ it("should garbage collect old sessions with idleTimeout") {
+ val (livyConf, manager) = createSessionManager()
+ val session = manager.register(new MockSession(manager.nextId(), null,
livyConf,
+ None, None, Some("4s")))
manager.get(session.id).isDefined should be(true)
eventually(timeout(5 seconds), interval(100 millis)) {
Await.result(manager.collectGarbage(), Duration.Inf)
diff --git
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 54208a65..cd8d2f50 100644
---
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -236,6 +236,7 @@ class LivyThriftSessionManager(val server:
LivyThriftServer, val livyConf: LivyC
server.accessManager,
createInteractiveRequest,
server.sessionStore,
+ None,
None)
onLivySessionOpened(newSession)
newSession