Repository: spark
Updated Branches:
  refs/heads/master ef55e46c9 -> bde27b89a


[SPARK-15131][SQL] Shutdown StateStore management thread when SparkContext has 
been shutdown

## What changes were proposed in this pull request?

Make sure that whenever the StateStoreCoordinator cannot be contacted, assume 
that the SparkContext and RpcEnv on the driver has been shutdown, and therefore 
stop the StateStore management thread, and unload all loaded stores.

## How was this patch tested?

Updated unit tests.

Author: Tathagata Das <[email protected]>

Closes #12905 from tdas/SPARK-15131.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bde27b89
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bde27b89
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bde27b89

Branch: refs/heads/master
Commit: bde27b89a243247bd3069b24cb4bb1eee94edfd7
Parents: ef55e46
Author: Tathagata Das <[email protected]>
Authored: Wed May 4 21:19:53 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Wed May 4 21:19:53 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/state/StateStore.scala  | 46 +++++++++++---------
 .../streaming/state/StateStoreSuite.scala       | 15 ++++++-
 .../streaming/StreamingAggregationSuite.scala   | 10 ++++-
 3 files changed, 48 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bde27b89/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 9521506..9948292 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -113,7 +113,7 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
  * the store is the active instance. Accordingly, it either keeps it loaded 
and performs
  * maintenance, or unloads the store.
  */
-private[state] object StateStore extends Logging {
+private[sql] object StateStore extends Logging {
 
   val MAINTENANCE_INTERVAL_CONFIG = 
"spark.streaming.stateStore.maintenanceInterval"
   val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
@@ -155,6 +155,10 @@ private[state] object StateStore extends Logging {
     loadedProviders.contains(storeId)
   }
 
+  def isMaintenanceRunning: Boolean = loadedProviders.synchronized {
+    maintenanceTask != null
+  }
+
   /** Unload and stop all state store providers */
   def stop(): Unit = loadedProviders.synchronized {
     loadedProviders.clear()
@@ -187,44 +191,44 @@ private[state] object StateStore extends Logging {
    */
   private def doMaintenance(): Unit = {
     logDebug("Doing maintenance")
-    loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, 
provider) =>
-      try {
-        if (verifyIfStoreInstanceActive(id)) {
-          provider.doMaintenance()
-        } else {
-          unload(id)
-          logInfo(s"Unloaded $provider")
+    if (SparkEnv.get == null) {
+      stop()
+    } else {
+      loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case 
(id, provider) =>
+        try {
+          if (verifyIfStoreInstanceActive(id)) {
+            provider.doMaintenance()
+          } else {
+            unload(id)
+            logInfo(s"Unloaded $provider")
+          }
+        } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error managing $provider, stopping management thread")
+            stop()
         }
-      } catch {
-        case NonFatal(e) =>
-          logWarning(s"Error managing $provider")
       }
     }
   }
 
   private def reportActiveStoreInstance(storeId: StateStoreId): Unit = {
-    try {
+    if (SparkEnv.get != null) {
       val host = SparkEnv.get.blockManager.blockManagerId.host
       val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
       coordinatorRef.foreach(_.reportActiveInstance(storeId, host, executorId))
       logDebug(s"Reported that the loaded instance $storeId is active")
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Error reporting active instance of $storeId")
     }
   }
 
   private def verifyIfStoreInstanceActive(storeId: StateStoreId): Boolean = {
-    try {
+    if (SparkEnv.get != null) {
       val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
       val verified =
         coordinatorRef.map(_.verifyIfInstanceActive(storeId, 
executorId)).getOrElse(false)
-      logDebug(s"Verified whether the loaded instance $storeId is active: 
$verified" )
+      logDebug(s"Verified whether the loaded instance $storeId is active: 
$verified")
       verified
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Error verifying active instance of $storeId")
-        false
+    } else {
+      false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bde27b89/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index dd23925..f8f8bc7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -47,8 +47,14 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
   private val keySchema = StructType(Seq(StructField("key", StringType, true)))
   private val valueSchema = StructType(Seq(StructField("value", IntegerType, 
true)))
 
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
   after {
     StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
   }
 
   test("get, put, remove, commit, and all data iterator") {
@@ -352,7 +358,7 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
     }
   }
 
-  ignore("maintenance") {
+  test("maintenance") {
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test")
@@ -366,20 +372,26 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
     val provider = new HDFSBackedStateStoreProvider(
       storeId, keySchema, valueSchema, storeConf, hadoopConf)
 
+
     quietly {
       withSpark(new SparkContext(conf)) { sc =>
         withCoordinatorRef(sc) { coordinatorRef =>
+          require(!StateStore.isMaintenanceRunning, "StateStore is 
unexpectedly running")
+
           for (i <- 1 to 20) {
             val store = StateStore.get(
               storeId, keySchema, valueSchema, i - 1, storeConf, hadoopConf)
             put(store, "a", i)
             store.commit()
           }
+
           eventually(timeout(10 seconds)) {
             assert(coordinatorRef.getLocation(storeId).nonEmpty, "active 
instance was not reported")
           }
 
           // Background maintenance should clean up and generate snapshots
+          assert(StateStore.isMaintenanceRunning, "Maintenance task is not 
running")
+
           eventually(timeout(10 seconds)) {
             // Earliest delta file should get cleaned up
             assert(!fileExists(provider, 1, isSnapshot = false), "earliest 
file not deleted")
@@ -418,6 +430,7 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
       require(SparkEnv.get === null)
       eventually(timeout(10 seconds)) {
         assert(!StateStore.isLoaded(storeId))
+        assert(!StateStore.isMaintenanceRunning)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bde27b89/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index bdf40f5..8da7742 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.streaming
 
+import org.scalatest.BeforeAndAfterAll
+
 import org.apache.spark.SparkException
 import org.apache.spark.sql.StreamTest
 import org.apache.spark.sql.catalyst.analysis.Update
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.expressions.scala.typed
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
@@ -29,7 +32,12 @@ object FailureSinglton {
   var firstTime = true
 }
 
-class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
+class StreamingAggregationSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterAll {
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    StateStore.stop()
+  }
 
   import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to