This is an automated email from the ASF dual-hosted git repository.

mgaido pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new bf30e3bd [LIVY-987] NPE when waiting for thrift session to start 
timeout.
bf30e3bd is described below

commit bf30e3bd3105f72a6a7fd2a276a16dabe8bdf104
Author: jianzhen.wu <[email protected]>
AuthorDate: Tue Sep 5 11:57:36 2023 +0200

    [LIVY-987] NPE when waiting for thrift session to start timeout.
    
    ## What changes were proposed in this pull request?
    Fix NPE when waiting for thrift session to start timeout.
    https://issues.apache.org/jira/browse/LIVY-987
    
    ## How was this patch tested?
    manual tests by beeline and set timeout to 10s.
    
    0: jdbc:hive2://username:passwordthrift-server> select 123;
    
    RSC client is executing SQL query: select 123, statementId = 
681bc017-8f37-4665-a575-da355db77254, session = SessionHandle 
[c17f1729-6ee1-4260-b82b-aebec3b08e14]
    
    Livy session has not yet started. Please wait for it to be ready...
    
    Error: java.util.concurrent.TimeoutException: Futures timed out after 
[10000 milliseconds] (state=,code=0)
    
    0: jdbc:hive2://username:passwordthrift-server> Closing: 0: 
jdbc:hive2://username:passwordthrift-server
    
    Please review https://livy.incubator.apache.org/community/ before opening a 
pull request.
    
    Author: jianzhen.wu <[email protected]>
    Author: jianzhenwu <[email protected]>
    
    Closes #416 from jianzhenwu/LIVY-987.
---
 .../thriftserver/LivyThriftSessionManager.scala    |  6 +--
 .../TestLivyThriftSessionManager.scala             | 47 +++++++++++++++++++++-
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 11294db8..54208a65 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -50,7 +50,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, 
val livyConf: LivyC
   extends ThriftService(classOf[LivyThriftSessionManager].getName) with 
Logging {
 
   private[thriftserver] val operationManager = new LivyOperationManager(this)
-  private val sessionHandleToLivySession =
+  private[thriftserver] val sessionHandleToLivySession =
     new ConcurrentHashMap[SessionHandle, Future[InteractiveSession]]()
   // A map which returns how many incoming connections are open for a Livy 
session.
   // This map tracks only the sessions created by the Livy thriftserver and 
not those which have
@@ -95,12 +95,12 @@ class LivyThriftSessionManager(val server: 
LivyThriftServer, val livyConf: LivyC
     if (!future.isCompleted) {
       Try(Await.result(future, maxSessionWait)) match {
         case Success(session) => session
-        case Failure(e) => throw e.getCause
+        case Failure(e) => throw Option(e.getCause).getOrElse(e)
       }
     } else {
       future.value match {
         case Some(Success(session)) => session
-        case Some(Failure(e)) => throw e.getCause
+        case Some(Failure(e)) => throw Option(e.getCause).getOrElse(e)
         case None => throw new RuntimeException("Future cannot be None when it 
is completed")
       }
     }
diff --git 
a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
 
b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
index fee801e3..11eea31f 100644
--- 
a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
+++ 
b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
@@ -17,15 +17,24 @@
 
 package org.apache.livy.thriftserver
 
-import org.apache.hive.service.cli.HiveSQLException
+import java.util.concurrent.TimeoutException
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+
+import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
 import org.junit.Assert._
 import org.junit.Test
 import org.mockito.Mockito.mock
 
 import org.apache.livy.LivyConf
 import org.apache.livy.server.AccessManager
+import org.apache.livy.server.interactive.InteractiveSession
 import org.apache.livy.server.recovery.{SessionStore, StateStore}
 import org.apache.livy.sessions.InteractiveSessionManager
+import org.apache.livy.utils.Clock.sleep
 
 object ConnectionLimitType extends Enumeration {
   type ConnectionLimitType = Value
@@ -49,6 +58,18 @@ class TestLivyThriftSessionManager {
       }
       conf.set(entry, limit)
     }
+    this.createThriftSessionManager(conf)
+  }
+
+  private def createThriftSessionManager(
+      maxSessionWait: Option[String]): LivyThriftSessionManager = {
+    val conf = new LivyConf()
+    conf.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+    maxSessionWait.foreach(conf.set(LivyConf.THRIFT_SESSION_CREATION_TIMEOUT, 
_))
+    this.createThriftSessionManager(conf)
+  }
+
+  private def createThriftSessionManager(conf: LivyConf): 
LivyThriftSessionManager = {
     val server = new LivyThriftServer(
       conf,
       mock(classOf[InteractiveSessionManager]),
@@ -142,4 +163,28 @@ class TestLivyThriftSessionManager {
     val msg = s"Connection limit per user reached (user: $user limit: 3)"
     testLimit(thriftSessionMgr, user, ipAddress, forwardedAddresses, msg)
   }
+
+  @Test(expected = classOf[TimeoutException])
+  def testGetLivySessionWaitForTimeout(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(Some("10ms"))
+    val sessionHandle = mock(classOf[SessionHandle])
+    val future = Future[InteractiveSession] {
+      sleep(100)
+      mock(classOf[InteractiveSession])
+    }
+    thriftSessionMgr.sessionHandleToLivySession.put(sessionHandle, future)
+    thriftSessionMgr.getLivySession(sessionHandle)
+  }
+
+  @Test(expected = classOf[TimeoutException])
+  def testGetLivySessionWithTimeoutException(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(None)
+    val sessionHandle = mock(classOf[SessionHandle])
+    val future = Future[InteractiveSession] {
+      throw new TimeoutException("Actively throw TimeoutException in Future.")
+    }
+    thriftSessionMgr.sessionHandleToLivySession.put(sessionHandle, future)
+    Await.ready(future, Duration(30, TimeUnit.SECONDS))
+    thriftSessionMgr.getLivySession(sessionHandle)
+  }
 }

Reply via email to