This is an automated email from the ASF dual-hosted git repository.
ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 6dcb2942 [LIVY-1007] Livy should not spawn one thread per job to track
the job on Kubernetes
6dcb2942 is described below
commit 6dcb29422f5a43e396a6a2006819284e491de169
Author: Asif Khatri <[email protected]>
AuthorDate: Mon Aug 12 16:56:29 2024 +0530
[LIVY-1007] Livy should not spawn one thread per job to track the job on
Kubernetes
## What changes were proposed in this pull request?
Instead of spawning a monitor thread for every session, create a
centralised thread to monitor all Kubernetes apps.
JIRA link: https://issues.apache.org/jira/browse/LIVY-1007
## How was this patch tested?
**Unit Tests:** The patch has been verified through comprehensive unit
tests.
**Manual Testing:** Conducted manual testing using Kubernetes on Docker
Desktop.
Environment: Helm charts. For detailed instructions on testing using Helm
charts, please refer to the documentation available at
[livycluster](https://github.com/askhatri/livycluster).
Co-authored-by: Asif Khatri <[email protected]>
---
conf/livy.conf.template | 4 +
.../src/main/scala/org/apache/livy/LivyConf.scala | 7 +
.../org/apache/livy/utils/SparkKubernetesApp.scala | 219 +++++++++++++++++----
.../apache/livy/utils/SparkKubernetesAppSpec.scala | 22 ++-
4 files changed, 215 insertions(+), 37 deletions(-)
diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index a5e47a9d..ddf69ee2 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -216,6 +216,10 @@
# If Livy can't find the Kubernetes app within this time, consider it lost.
# livy.server.kubernetes.app-lookup-timeout = 600s
+# If Livy can't find the Kubernetes app within this max times, consider it
lost.
+# livy.server.kubernetes.app-lookup.max-failed.times = 120
+# The size of thread pool to monitor all Kubernetes apps.
+# livy.server.kubernetes.app-lookup.thread-pool.size = 4
# When the cluster is busy, we may fail to launch yarn app in
app-lookup-timeout, then it would
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala
b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 6bef0974..25e6ea80 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -271,6 +271,13 @@ object LivyConf {
// If Livy can't find the Kubernetes app within this time, consider it lost.
val KUBERNETES_APP_LOOKUP_TIMEOUT =
Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
+ // If Livy can't find the Kubernetes app within this max times, consider it
lost.
+ val KUBERNETES_APP_LOOKUP_MAX_FAILED_TIMES =
+ Entry("livy.server.kubernetes.app-lookup.max-failed.times", 120)
+ // The size of thread pool to monitor all Kubernetes apps.
+ val KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE =
+ Entry("livy.server.kubernetes.app-lookup.thread-pool.size", 4)
+
// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval",
"15s")
diff --git
a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
index b5160aaf..c4574dee 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
@@ -18,7 +18,7 @@ package org.apache.livy.utils
import java.net.URLEncoder
import java.util.Collections
-import java.util.concurrent.TimeoutException
+import java.util.concurrent._
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
@@ -33,12 +33,16 @@ import
io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder}
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, _}
import org.apache.commons.lang.StringUtils
-import org.apache.livy.{LivyConf, Logging, Utils}
+import org.apache.livy.{LivyConf, Logging}
object SparkKubernetesApp extends Logging {
private val leakedAppTags = new
java.util.concurrent.ConcurrentHashMap[String, Long]()
+ private val monitorAppThreadMap = new
java.util.concurrent.ConcurrentHashMap[Thread, Long]()
+
+ private val appQueue = new ConcurrentLinkedQueue[SparkKubernetesApp]()
+
private val leakedAppsGCThread = new Thread() {
override def run(): Unit = {
import KubernetesExtensions._
@@ -97,6 +101,33 @@ object SparkKubernetesApp extends Logging {
}
}
+ private val checkMonitorAppTimeoutThread = new Thread() {
+ override def run(): Unit = {
+ while (true) {
+ try {
+ val iter = monitorAppThreadMap.entrySet().iterator()
+ val now = System.currentTimeMillis()
+
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val thread = entry.getKey
+ val updatedTime = entry.getValue
+
+ val remaining: Long = now - updatedTime - pollInterval.toMillis
+ if (remaining > appLookupTimeout.toMillis) {
+ thread.interrupt()
+ }
+ }
+
+ Thread.sleep(pollInterval.toMillis)
+ } catch {
+ case e: InterruptedException =>
+ error("Apps timeout monitoring thread was interrupted.", e)
+ }
+ }
+ }
+ }
+
private var livyConf: LivyConf = _
private var cacheLogSize: Int = _
@@ -108,7 +139,10 @@ object SparkKubernetesApp extends Logging {
var kubernetesClient: DefaultKubernetesClient = _
- def init(livyConf: LivyConf): Unit = {
+ private var appLookupThreadPoolSize: Long = _
+ private var appLookupMaxFailedTimes: Long = _
+
+ def init(livyConf: LivyConf, client: Option[KubernetesClient] = None): Unit
= {
this.livyConf = livyConf
// KubernetesClient is thread safe. Create once, share it across threads.
@@ -119,6 +153,9 @@ object SparkKubernetesApp extends Logging {
appLookupTimeout =
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
pollInterval =
livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds
+ appLookupThreadPoolSize =
livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
+ appLookupMaxFailedTimes =
livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_MAX_FAILED_TIMES)
+
sessionLeakageCheckInterval =
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
sessionLeakageCheckTimeout =
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
@@ -131,6 +168,12 @@ object SparkKubernetesApp extends Logging {
setName("RefreshServiceAccountTokenThread")
RefreshServiceAccountTokenThread.setDaemon(true)
RefreshServiceAccountTokenThread.start()
+
+ checkMonitorAppTimeoutThread.setDaemon(true)
+ checkMonitorAppTimeoutThread.setName("CheckMonitorAppTimeoutThread")
+ checkMonitorAppTimeoutThread.start()
+
+ initKubernetesAppMonitorThreadPool(livyConf)
}
// Returning T, throwing the exception on failure
@@ -147,6 +190,53 @@ object SparkKubernetesApp extends Logging {
}
}
+ class KubernetesAppMonitorRunnable extends Runnable {
+ override def run(): Unit = {
+ while (true) {
+ try {
+ val poolSize =
livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
+ var numberOfAppsToProcess = appQueue.size() / poolSize
+ if (numberOfAppsToProcess < 1) {
+ numberOfAppsToProcess = 1
+ } else if (numberOfAppsToProcess > 20) {
+ numberOfAppsToProcess = 20
+ }
+ for (_ <- 0 until numberOfAppsToProcess) {
+ // update time when monitor app so that
+ // checkMonitorAppTimeoutThread can check whether the thread was
blocked on monitoring
+ monitorAppThreadMap.put(Thread.currentThread(),
System.currentTimeMillis())
+ val app = appQueue.poll()
+ if (app != null) {
+ app.monitorSparkKubernetesApp()
+ if (app.isRunning) {
+ appQueue.add(app)
+ }
+ }
+ }
+ Thread.sleep(pollInterval.toMillis)
+ } catch {
+ case e: InterruptedException =>
+ error(s"Kubernetes app monitoring was interrupted.", e)
+ }
+ }
+ }
+ }
+
+ private def initKubernetesAppMonitorThreadPool(livyConf: LivyConf): Unit = {
+ val poolSize =
livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
+ val KubernetesAppMonitorThreadPool: ExecutorService =
+ Executors.newFixedThreadPool(poolSize)
+
+ val runnable = new KubernetesAppMonitorRunnable()
+
+ for (_ <- 0 until poolSize) {
+ KubernetesAppMonitorThreadPool.execute(runnable)
+ }
+ }
+
+ def getAppSize: Int = appQueue.size()
+
+ def clearApps(): Unit = appQueue.clear()
}
class SparkKubernetesApp private[utils] (
@@ -162,26 +252,59 @@ class SparkKubernetesApp private[utils] (
import KubernetesExtensions._
import SparkKubernetesApp._
+ appQueue.add(this)
+ private var killed = false
private val appPromise: Promise[KubernetesApplication] = Promise()
private[utils] var state: SparkApp.State = SparkApp.State.STARTING
private var kubernetesDiagnostics: IndexedSeq[String] =
IndexedSeq.empty[String]
private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String]
- // Exposed for unit test.
- // TODO Instead of spawning a thread for every session, create a centralized
thread and
- // batch Kubernetes queries.
- private[utils] val kubernetesAppMonitorThread = Utils
- .startDaemonThread(s"kubernetesAppMonitorThread-$this") {
+ private var kubernetesTagToAppIdFailedTimes: Int = _
+ private var kubernetesAppMonitorFailedTimes: Int = _
+
+ private def failToMonitor(): Unit = {
+ changeState(SparkApp.State.FAILED)
+ process.foreach(_.destroy())
+ leakedAppTags.put(appTag, System.currentTimeMillis())
+ }
+
+ private def failToGetAppId(): Unit = {
+ kubernetesTagToAppIdFailedTimes += 1
+ if (kubernetesTagToAppIdFailedTimes > appLookupMaxFailedTimes) {
+ val msg = "No KUBERNETES application is found with tag " +
+ s"${appTag.toLowerCase}. This may be because " +
+ "1) spark-submit fail to submit application to KUBERNETES; " +
+ "or 2) KUBERNETES cluster doesn't have enough resource to start the
application in time. " +
+ "Please check Livy log and KUBERNETES log to know the details."
+
+ error(s"Failed monitoring the app $appTag: $msg")
+ kubernetesDiagnostics = ArrayBuffer(msg)
+ failToMonitor()
+ }
+ }
+
+ private def monitorSparkKubernetesApp(): Unit = {
try {
+ if (killed) {
+ changeState(SparkApp.State.KILLED)
+ } else if (isProcessErrExit) {
+ changeState(SparkApp.State.FAILED)
+ }
// Get KubernetesApplication by appTag.
- val app: KubernetesApplication = try {
+ val appOption: Option[KubernetesApplication] = try {
getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
} catch {
case e: Exception =>
+ failToGetAppId()
appPromise.failure(e)
- throw e
+ return
+ }
+ if (appOption.isEmpty) {
+ failToGetAppId()
+ return
}
- appPromise.success(app)
+ val app: KubernetesApplication = appOption.get
+ appPromise.trySuccess(app)
val appId = app.getApplicationId
Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
@@ -192,7 +315,9 @@ class SparkKubernetesApp private[utils] (
}
var appInfo = AppInfo()
- while (isRunning) {
+
+ // while loop is replaced with "if" condition so that another thread can
process and continue
+ if (isRunning) {
try {
Clock.sleep(pollInterval.toMillis)
@@ -222,14 +347,22 @@ class SparkKubernetesApp private[utils] (
} catch {
// TODO analyse available exceptions
case e: Throwable =>
- throw e
+ error(s"Failed to refresh application state for $appTag.", e)
}
}
+
+ kubernetesTagToAppIdFailedTimes = 0
+ kubernetesAppMonitorFailedTimes = 0
debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
+ Thread.currentThread().setName(s"appMonitorCommonThreadPool")
} catch {
- case _: InterruptedException =>
- kubernetesDiagnostics = ArrayBuffer("Application stopped by user.")
- changeState(SparkApp.State.KILLED)
+ case e: InterruptedException =>
+ kubernetesAppMonitorFailedTimes += 1
+ if (kubernetesAppMonitorFailedTimes > appLookupMaxFailedTimes) {
+ error(s"Monitoring of the app $appTag was interrupted.", e)
+ kubernetesDiagnostics = ArrayBuffer(e.getMessage)
+ failToMonitor()
+ }
case NonFatal(e) =>
error(s"Error while refreshing Kubernetes state", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
@@ -250,18 +383,38 @@ class SparkKubernetesApp private[utils] (
("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)
override def kill(): Unit = synchronized {
- try {
-
withRetry(kubernetesClient.killApplication(Await.result(appPromise.future,
appLookupTimeout)))
- } catch {
- // We cannot kill the Kubernetes app without the appTag.
- // There's a chance the Kubernetes app hasn't been submitted during a
livy-server failure.
- // We don't want a stuck session that can't be deleted. Emit a warning
and move on.
- case _: TimeoutException | _: InterruptedException =>
- warn("Deleting a session while its Kubernetes application is not
found.")
- kubernetesAppMonitorThread.interrupt()
- } finally {
- process.foreach(_.destroy())
+ killed = true
+
+ if (!isRunning) {
+ return
+ }
+
+ process.foreach(_.destroy())
+
+ def applicationDetails: Option[Try[KubernetesApplication]] =
appPromise.future.value
+ if (applicationDetails.isEmpty) {
+ leakedAppTags.put(appTag, System.currentTimeMillis())
+ return
}
+ def kubernetesApplication: KubernetesApplication =
applicationDetails.get.get
+ if (kubernetesApplication != null &&
kubernetesApplication.getApplicationId != null) {
+ try {
+ withRetry(kubernetesClient.killApplication(
+ Await.result(appPromise.future, appLookupTimeout)))
+ } catch {
+ // We cannot kill the Kubernetes app without the appTag.
+ // There's a chance the Kubernetes app hasn't been submitted during a
livy-server failure.
+ // We don't want a stuck session that can't be deleted. Emit a warning
and move on.
+ case _: TimeoutException | _: InterruptedException =>
+ warn("Deleting a session while its Kubernetes application is not
found.")
+ }
+ } else {
+ leakedAppTags.put(appTag, System.currentTimeMillis())
+ }
+ }
+
+ private def isProcessErrExit: Boolean = {
+ process.isDefined && !process.get.isAlive && process.get.exitValue() != 0
}
private def isRunning: Boolean = {
@@ -282,18 +435,17 @@ class SparkKubernetesApp private[utils] (
*
* @param appTag The application tag tagged on the target application.
* If the tag is not unique, it returns the first application
it found.
- * @return KubernetesApplication or the failure.
+ * @return Option[KubernetesApplication] or the failure.
*/
- @tailrec
private def getAppFromTag(
appTag: String,
pollInterval: duration.Duration,
- deadline: Deadline): KubernetesApplication = {
+ deadline: Deadline): Option[KubernetesApplication] = {
import KubernetesExtensions._
withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag)))
match {
- case Some(app) => app
+ case Some(app) => Some(app)
case None =>
if (deadline.isOverdue) {
process.foreach(_.destroy())
@@ -307,10 +459,7 @@ class SparkKubernetesApp private[utils] (
throw new IllegalStateException(s"Failed to submit Kubernetes
application with tag" +
s" $appTag. 'spark-submit' exited with non-zero status. " +
s"Please check Livy log and Kubernetes log to know the details.")
- } else {
- Clock.sleep(pollInterval.toMillis)
- getAppFromTag(appTag, pollInterval, deadline)
- }
+ } else None
}
}
diff --git
a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
index 00257acd..24e66e0d 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
@@ -20,14 +20,32 @@ import java.util.Objects._
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule,
IngressSpec}
+import io.fabric8.kubernetes.client.KubernetesClient
import org.mockito.Mockito.when
-import org.scalatest.FunSpec
+import org.scalatest.{BeforeAndAfterAll, FunSpec}
import org.scalatestplus.mockito.MockitoSugar._
import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
import org.apache.livy.utils.KubernetesConstants.SPARK_APP_TAG_LABEL
-class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite {
+class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with
BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ val livyConf = new LivyConf()
+ livyConf.set(LivyConf.KUBERNETES_POLL_INTERVAL, "500ms")
+ livyConf.set(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
+ livyConf.set(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")
+
+ val client = mock[KubernetesClient]
+ SparkKubernetesApp.init(livyConf, Some(client))
+ SparkKubernetesApp.clearApps
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ assert(SparkKubernetesApp.getAppSize === 0)
+ }
describe("KubernetesAppReport") {
import scala.collection.JavaConverters._