Repository: spark
Updated Branches:
  refs/heads/master b5f8c36e3 -> 11fa8741c


[SQL][HOTFIX] Fix flakiness in StateStoreRDDSuite

## What changes were proposed in this pull request?
StateStoreCoordinator.reportActiveInstance is async, so subsequence state 
checks must be in eventually.
## How was this patch tested?
Jenkins tests

Author: Tathagata Das <[email protected]>

Closes #11924 from tdas/state-store-flaky-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11fa8741
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11fa8741
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11fa8741

Branch: refs/heads/master
Commit: 11fa8741ca5b550e4f373c5d6e520c64f5118d0c
Parents: b5f8c36
Author: Tathagata Das <[email protected]>
Authored: Fri Mar 25 12:04:47 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Fri Mar 25 12:04:47 2016 -0700

----------------------------------------------------------------------
 .../streaming/state/StateStoreCoordinatorSuite.scala     |  1 -
 .../execution/streaming/state/StateStoreRDDSuite.scala   | 11 ++++++++---
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/11fa8741/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index c99c2f5..a7e3262 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -71,7 +71,6 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
         assert(coordinatorRef.verifyIfInstanceActive(id1, exec) === true)
         assert(coordinatorRef.verifyIfInstanceActive(id2, exec) === true)
         assert(coordinatorRef.verifyIfInstanceActive(id3, exec) === true)
-
       }
 
       coordinatorRef.deactivateInstances("x")

http://git-wip-us.apache.org/repos/asf/spark/blob/11fa8741/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
index 24cec30..df50cbd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
@@ -23,6 +23,8 @@ import java.nio.file.Files
 import scala.util.Random
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.LocalSparkContext._
@@ -121,9 +123,12 @@ class StateStoreRDDSuite extends SparkFunSuite with 
BeforeAndAfter with BeforeAn
         val coordinatorRef = sqlContext.streams.stateStoreCoordinator
         coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), 
"host1", "exec1")
         coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), 
"host2", "exec2")
-        assert(
-          coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
-            Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
+
+        eventually(timeout(10 seconds)) {
+          assert(
+            coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
+              Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
+        }
 
         val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore(
           increment, path, opId, storeVersion = 0, keySchema, valueSchema)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to