This is an automated email from the ASF dual-hosted git repository.

ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dcb2942 [LIVY-1007] Livy should not spawn one thread per job to track 
the job on Kubernetes
6dcb2942 is described below

commit 6dcb29422f5a43e396a6a2006819284e491de169
Author: Asif Khatri <[email protected]>
AuthorDate: Mon Aug 12 16:56:29 2024 +0530

    [LIVY-1007] Livy should not spawn one thread per job to track the job on 
Kubernetes
    
    ## What changes were proposed in this pull request?
    
    Instead of spawning a monitor thread for every session, create a 
centralised thread to monitor all Kubernetes apps.
    
    JIRA link: https://issues.apache.org/jira/browse/LIVY-1007
    
    ## How was this patch tested?
    
    **Unit Tests:** The patch has been verified through comprehensive unit 
tests.
    **Manual Testing:** Conducted manual testing using Kubernetes on Docker 
Desktop.
    
    Environment: Helm charts. For detailed instructions on testing using Helm 
charts, please refer to the documentation available at 
[livycluster](https://github.com/askhatri/livycluster).
    
    Co-authored-by: Asif Khatri <[email protected]>
---
 conf/livy.conf.template                            |   4 +
 .../src/main/scala/org/apache/livy/LivyConf.scala  |   7 +
 .../org/apache/livy/utils/SparkKubernetesApp.scala | 219 +++++++++++++++++----
 .../apache/livy/utils/SparkKubernetesAppSpec.scala |  22 ++-
 4 files changed, 215 insertions(+), 37 deletions(-)

diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index a5e47a9d..ddf69ee2 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -216,6 +216,10 @@
 
 # If Livy can't find the Kubernetes app within this time, consider it lost.
 # livy.server.kubernetes.app-lookup-timeout = 600s
+# If Livy can't find the Kubernetes app within this max times, consider it 
lost.
+# livy.server.kubernetes.app-lookup.max-failed.times = 120
+# The size of thread pool to monitor all Kubernetes apps.
+# livy.server.kubernetes.app-lookup.thread-pool.size = 4
 # When the cluster is busy, we may fail to launch yarn app in 
app-lookup-timeout, then it would
 # cause session leakage, so we need to check session leakage.
 # How long to check livy session leakage
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala 
b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 6bef0974..25e6ea80 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -271,6 +271,13 @@ object LivyConf {
 
   // If Livy can't find the Kubernetes app within this time, consider it lost.
   val KUBERNETES_APP_LOOKUP_TIMEOUT = 
Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
+  // If Livy can't find the Kubernetes app within this max times, consider it 
lost.
+  val KUBERNETES_APP_LOOKUP_MAX_FAILED_TIMES =
+    Entry("livy.server.kubernetes.app-lookup.max-failed.times", 120)
+  // The size of thread pool to monitor all Kubernetes apps.
+  val KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE =
+    Entry("livy.server.kubernetes.app-lookup.thread-pool.size", 4)
+
   // How often Livy polls Kubernetes to refresh Kubernetes app state.
   val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", 
"15s")
 
diff --git 
a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala 
b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
index b5160aaf..c4574dee 100644
--- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
+++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
@@ -18,7 +18,7 @@ package org.apache.livy.utils
 
 import java.net.URLEncoder
 import java.util.Collections
-import java.util.concurrent.TimeoutException
+import java.util.concurrent._
 
 import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
@@ -33,12 +33,16 @@ import 
io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder}
 import io.fabric8.kubernetes.client.{Config, ConfigBuilder, _}
 import org.apache.commons.lang.StringUtils
 
-import org.apache.livy.{LivyConf, Logging, Utils}
+import org.apache.livy.{LivyConf, Logging}
 
 object SparkKubernetesApp extends Logging {
 
   private val leakedAppTags = new 
java.util.concurrent.ConcurrentHashMap[String, Long]()
 
+  private val monitorAppThreadMap = new 
java.util.concurrent.ConcurrentHashMap[Thread, Long]()
+
+  private val appQueue = new ConcurrentLinkedQueue[SparkKubernetesApp]()
+
   private val leakedAppsGCThread = new Thread() {
     override def run(): Unit = {
       import KubernetesExtensions._
@@ -97,6 +101,33 @@ object SparkKubernetesApp extends Logging {
     }
   }
 
+  private val checkMonitorAppTimeoutThread = new Thread() {
+    override def run(): Unit = {
+      while (true) {
+        try {
+          val iter = monitorAppThreadMap.entrySet().iterator()
+          val now = System.currentTimeMillis()
+
+          while (iter.hasNext) {
+            val entry = iter.next()
+            val thread = entry.getKey
+            val updatedTime = entry.getValue
+
+            val remaining: Long = now - updatedTime - pollInterval.toMillis
+            if (remaining > appLookupTimeout.toMillis) {
+              thread.interrupt()
+            }
+          }
+
+          Thread.sleep(pollInterval.toMillis)
+        } catch {
+          case e: InterruptedException =>
+            error("Apps timeout monitoring thread was interrupted.", e)
+        }
+      }
+    }
+  }
+
   private var livyConf: LivyConf = _
 
   private var cacheLogSize: Int = _
@@ -108,7 +139,10 @@ object SparkKubernetesApp extends Logging {
 
   var kubernetesClient: DefaultKubernetesClient = _
 
-  def init(livyConf: LivyConf): Unit = {
+  private var appLookupThreadPoolSize: Long = _
+  private var appLookupMaxFailedTimes: Long = _
+
+  def init(livyConf: LivyConf, client: Option[KubernetesClient] = None): Unit 
= {
     this.livyConf = livyConf
 
     // KubernetesClient is thread safe. Create once, share it across threads.
@@ -119,6 +153,9 @@ object SparkKubernetesApp extends Logging {
     appLookupTimeout = 
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
     pollInterval = 
livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds
 
+    appLookupThreadPoolSize = 
livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
+    appLookupMaxFailedTimes = 
livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_MAX_FAILED_TIMES)
+
     sessionLeakageCheckInterval =
       livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
     sessionLeakageCheckTimeout = 
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
@@ -131,6 +168,12 @@ object SparkKubernetesApp extends Logging {
       setName("RefreshServiceAccountTokenThread")
     RefreshServiceAccountTokenThread.setDaemon(true)
     RefreshServiceAccountTokenThread.start()
+
+    checkMonitorAppTimeoutThread.setDaemon(true)
+    checkMonitorAppTimeoutThread.setName("CheckMonitorAppTimeoutThread")
+    checkMonitorAppTimeoutThread.start()
+
+    initKubernetesAppMonitorThreadPool(livyConf)
   }
 
   // Returning T, throwing the exception on failure
@@ -147,6 +190,53 @@ object SparkKubernetesApp extends Logging {
     }
   }
 
+  class KubernetesAppMonitorRunnable extends Runnable {
+    override def run(): Unit = {
+      while (true) {
+        try {
+          val poolSize = 
livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
+          var numberOfAppsToProcess = appQueue.size() / poolSize
+          if (numberOfAppsToProcess < 1) {
+            numberOfAppsToProcess = 1
+          } else if (numberOfAppsToProcess > 20) {
+            numberOfAppsToProcess = 20
+          }
+          for (_ <- 0 until numberOfAppsToProcess) {
+            // update time when monitor app so that
+            // checkMonitorAppTimeoutThread can check whether the thread was 
blocked on monitoring
+            monitorAppThreadMap.put(Thread.currentThread(), 
System.currentTimeMillis())
+            val app = appQueue.poll()
+            if (app != null) {
+              app.monitorSparkKubernetesApp()
+              if (app.isRunning) {
+                appQueue.add(app)
+              }
+            }
+          }
+          Thread.sleep(pollInterval.toMillis)
+        } catch {
+          case e: InterruptedException =>
+            error(s"Kubernetes app monitoring was interrupted.", e)
+        }
+      }
+    }
+  }
+
+  private def initKubernetesAppMonitorThreadPool(livyConf: LivyConf): Unit = {
+    val poolSize = 
livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
+    val KubernetesAppMonitorThreadPool: ExecutorService =
+      Executors.newFixedThreadPool(poolSize)
+
+    val runnable = new KubernetesAppMonitorRunnable()
+
+    for (_ <- 0 until poolSize) {
+      KubernetesAppMonitorThreadPool.execute(runnable)
+    }
+  }
+
+  def getAppSize: Int = appQueue.size()
+
+  def clearApps(): Unit = appQueue.clear()
 }
 
 class SparkKubernetesApp private[utils] (
@@ -162,26 +252,59 @@ class SparkKubernetesApp private[utils] (
   import KubernetesExtensions._
   import SparkKubernetesApp._
 
+  appQueue.add(this)
+  private var killed = false
   private val appPromise: Promise[KubernetesApplication] = Promise()
   private[utils] var state: SparkApp.State = SparkApp.State.STARTING
   private var kubernetesDiagnostics: IndexedSeq[String] = 
IndexedSeq.empty[String]
   private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String]
 
-  // Exposed for unit test.
-  // TODO Instead of spawning a thread for every session, create a centralized 
thread and
-  // batch Kubernetes queries.
-  private[utils] val kubernetesAppMonitorThread = Utils
-    .startDaemonThread(s"kubernetesAppMonitorThread-$this") {
+  private var kubernetesTagToAppIdFailedTimes: Int = _
+  private var kubernetesAppMonitorFailedTimes: Int = _
+
+  private def failToMonitor(): Unit = {
+    changeState(SparkApp.State.FAILED)
+    process.foreach(_.destroy())
+    leakedAppTags.put(appTag, System.currentTimeMillis())
+  }
+
+  private def failToGetAppId(): Unit = {
+    kubernetesTagToAppIdFailedTimes += 1
+    if (kubernetesTagToAppIdFailedTimes > appLookupMaxFailedTimes) {
+      val msg = "No KUBERNETES application is found with tag " +
+        s"${appTag.toLowerCase}. This may be because " +
+        "1) spark-submit fail to submit application to KUBERNETES; " +
+        "or 2) KUBERNETES cluster doesn't have enough resource to start the 
application in time. " +
+        "Please check Livy log and KUBERNETES log to know the details."
+
+      error(s"Failed monitoring the app $appTag: $msg")
+      kubernetesDiagnostics = ArrayBuffer(msg)
+      failToMonitor()
+    }
+  }
+
+  private def monitorSparkKubernetesApp(): Unit = {
     try {
+      if (killed) {
+        changeState(SparkApp.State.KILLED)
+      } else if (isProcessErrExit) {
+        changeState(SparkApp.State.FAILED)
+      }
       // Get KubernetesApplication by appTag.
-      val app: KubernetesApplication = try {
+      val appOption: Option[KubernetesApplication] = try {
         getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
       } catch {
         case e: Exception =>
+          failToGetAppId()
           appPromise.failure(e)
-          throw e
+          return
+      }
+      if (appOption.isEmpty) {
+        failToGetAppId()
+        return
       }
-      appPromise.success(app)
+      val app: KubernetesApplication = appOption.get
+      appPromise.trySuccess(app)
       val appId = app.getApplicationId
 
       Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
@@ -192,7 +315,9 @@ class SparkKubernetesApp private[utils] (
       }
 
       var appInfo = AppInfo()
-      while (isRunning) {
+
+      // while loop is replaced with "if" condition so that another thread can 
process and continue
+      if (isRunning) {
         try {
           Clock.sleep(pollInterval.toMillis)
 
@@ -222,14 +347,22 @@ class SparkKubernetesApp private[utils] (
         } catch {
           // TODO analyse available exceptions
           case e: Throwable =>
-            throw e
+            error(s"Failed to refresh application state for $appTag.", e)
         }
       }
+
+      kubernetesTagToAppIdFailedTimes = 0
+      kubernetesAppMonitorFailedTimes = 0
       debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
+      Thread.currentThread().setName(s"appMonitorCommonThreadPool")
     } catch {
-      case _: InterruptedException =>
-        kubernetesDiagnostics = ArrayBuffer("Application stopped by user.")
-        changeState(SparkApp.State.KILLED)
+      case e: InterruptedException =>
+        kubernetesAppMonitorFailedTimes += 1
+        if (kubernetesAppMonitorFailedTimes > appLookupMaxFailedTimes) {
+          error(s"Monitoring of the app $appTag was interrupted.", e)
+          kubernetesDiagnostics = ArrayBuffer(e.getMessage)
+          failToMonitor()
+        }
       case NonFatal(e) =>
         error(s"Error while refreshing Kubernetes state", e)
         kubernetesDiagnostics = ArrayBuffer(e.getMessage)
@@ -250,18 +383,38 @@ class SparkKubernetesApp private[utils] (
       ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)
 
   override def kill(): Unit = synchronized {
-    try {
-      
withRetry(kubernetesClient.killApplication(Await.result(appPromise.future, 
appLookupTimeout)))
-    } catch {
-      // We cannot kill the Kubernetes app without the appTag.
-      // There's a chance the Kubernetes app hasn't been submitted during a 
livy-server failure.
-      // We don't want a stuck session that can't be deleted. Emit a warning 
and move on.
-      case _: TimeoutException | _: InterruptedException =>
-        warn("Deleting a session while its Kubernetes application is not 
found.")
-        kubernetesAppMonitorThread.interrupt()
-    } finally {
-      process.foreach(_.destroy())
+    killed = true
+
+    if (!isRunning) {
+      return
+    }
+
+    process.foreach(_.destroy())
+
+    def applicationDetails: Option[Try[KubernetesApplication]] = 
appPromise.future.value
+    if (applicationDetails.isEmpty) {
+      leakedAppTags.put(appTag, System.currentTimeMillis())
+      return
     }
+    def kubernetesApplication: KubernetesApplication = 
applicationDetails.get.get
+    if (kubernetesApplication != null && 
kubernetesApplication.getApplicationId != null) {
+      try {
+        withRetry(kubernetesClient.killApplication(
+          Await.result(appPromise.future, appLookupTimeout)))
+      } catch {
+        // We cannot kill the Kubernetes app without the appTag.
+        // There's a chance the Kubernetes app hasn't been submitted during a 
livy-server failure.
+        // We don't want a stuck session that can't be deleted. Emit a warning 
and move on.
+        case _: TimeoutException | _: InterruptedException =>
+          warn("Deleting a session while its Kubernetes application is not 
found.")
+      }
+    } else {
+      leakedAppTags.put(appTag, System.currentTimeMillis())
+    }
+  }
+
+  private def isProcessErrExit: Boolean = {
+    process.isDefined && !process.get.isAlive && process.get.exitValue() != 0
   }
 
   private def isRunning: Boolean = {
@@ -282,18 +435,17 @@ class SparkKubernetesApp private[utils] (
     *
     * @param appTag The application tag tagged on the target application.
     *               If the tag is not unique, it returns the first application 
it found.
-    * @return KubernetesApplication or the failure.
+    * @return Option[KubernetesApplication] or the failure.
     */
-  @tailrec
   private def getAppFromTag(
     appTag: String,
     pollInterval: duration.Duration,
-    deadline: Deadline): KubernetesApplication = {
+    deadline: Deadline): Option[KubernetesApplication] = {
     import KubernetesExtensions._
 
     
withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag)))
     match {
-      case Some(app) => app
+      case Some(app) => Some(app)
       case None =>
         if (deadline.isOverdue) {
           process.foreach(_.destroy())
@@ -307,10 +459,7 @@ class SparkKubernetesApp private[utils] (
           throw new IllegalStateException(s"Failed to submit Kubernetes 
application with tag" +
             s" $appTag. 'spark-submit' exited with non-zero status. " +
             s"Please check Livy log and Kubernetes log to know the details.")
-        } else {
-          Clock.sleep(pollInterval.toMillis)
-          getAppFromTag(appTag, pollInterval, deadline)
-        }
+        } else None
     }
   }
 
diff --git 
a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala 
b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
index 00257acd..24e66e0d 100644
--- a/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
+++ b/server/src/test/scala/org/apache/livy/utils/SparkKubernetesAppSpec.scala
@@ -20,14 +20,32 @@ import java.util.Objects._
 
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, 
IngressSpec}
+import io.fabric8.kubernetes.client.KubernetesClient
 import org.mockito.Mockito.when
-import org.scalatest.FunSpec
+import org.scalatest.{BeforeAndAfterAll, FunSpec}
 import org.scalatestplus.mockito.MockitoSugar._
 
 import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
 import org.apache.livy.utils.KubernetesConstants.SPARK_APP_TAG_LABEL
 
-class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite {
+class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with 
BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val livyConf = new LivyConf()
+    livyConf.set(LivyConf.KUBERNETES_POLL_INTERVAL, "500ms")
+    livyConf.set(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
+    livyConf.set(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")
+
+    val client = mock[KubernetesClient]
+    SparkKubernetesApp.init(livyConf, Some(client))
+    SparkKubernetesApp.clearApps
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    assert(SparkKubernetesApp.getAppSize === 0)
+  }
 
   describe("KubernetesAppReport") {
     import scala.collection.JavaConverters._

Reply via email to