This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cd4a284  [SPARK-27460][FOLLOW-UP][TESTS] Fix flaky tests
cd4a284 is described below

commit cd4a284030e5142bdb405ff5b71735ac8cee2dde
Author: gatorsmile <gatorsm...@gmail.com>
AuthorDate: Wed Apr 24 17:36:29 2019 +0800

    [SPARK-27460][FOLLOW-UP][TESTS] Fix flaky tests
    
    ## What changes were proposed in this pull request?
    
    This patch makes several test flakiness fixes.
    
    ## How was this patch tested?
    N/A
    
    Closes #24434 from gatorsmile/fixFlakyTest.
    
    Lead-authored-by: gatorsmile <gatorsm...@gmail.com>
    Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/ExecutorAllocationManagerSuite.scala     | 18 +++++----
 .../org/apache/spark/SparkContextInfoSuite.scala   |  9 ++++-
 .../scala/org/apache/spark/SparkFunSuite.scala     | 46 +++++++++++++++++++++-
 .../org/apache/spark/StatusTrackerSuite.scala      |  2 +-
 .../deploy/history/FsHistoryProviderSuite.scala    | 20 ++++++----
 .../scheduler/SparkListenerWithClusterSuite.scala  | 10 ++---
 .../spark/deploy/yarn/BaseYarnClusterSuite.scala   |  2 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala       |  4 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   |  3 +-
 .../sql/streaming/FileStreamSourceSuite.scala      |  2 +-
 .../apache/spark/sql/streaming/StreamTest.scala    |  2 +-
 .../sql/streaming/StreamingQueryManagerSuite.scala | 18 +++++----
 .../org/apache/spark/streaming/ReceiverSuite.scala |  2 +-
 13 files changed, 99 insertions(+), 39 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 12c8a9d..9c30124 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 
 import org.mockito.ArgumentMatchers.{any, eq => meq}
 import org.mockito.Mockito.{mock, never, verify, when}
-import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
+import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.config
@@ -38,20 +38,24 @@ import org.apache.spark.util.ManualClock
  */
 class ExecutorAllocationManagerSuite
   extends SparkFunSuite
-  with LocalSparkContext
-  with BeforeAndAfter {
+  with LocalSparkContext {
 
   import ExecutorAllocationManager._
   import ExecutorAllocationManagerSuite._
 
   private val contexts = new mutable.ListBuffer[SparkContext]()
 
-  before {
+  override def beforeEach(): Unit = {
+    super.beforeEach()
     contexts.clear()
   }
 
-  after {
-    contexts.foreach(_.stop())
+  override def afterEach(): Unit = {
+    try {
+      contexts.foreach(_.stop())
+    } finally {
+      super.afterEach()
+    }
   }
 
   private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = {
@@ -282,7 +286,7 @@ class ExecutorAllocationManagerSuite
     assert(totalRunningTasks(manager) === 0)
   }
 
-  test("cancel pending executors when no longer needed") {
+  testRetry("cancel pending executors when no longer needed") {
     sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
     post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5)))
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index a57afdf..536b4ae 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark
 
+import scala.concurrent.duration._
+
 import org.scalatest.Assertions
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.storage.StorageLevel
 
@@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with 
LocalSparkContext {
   test("getRDDStorageInfo only reports on RDDs that actually persist data") {
     sc = new SparkContext("local", "test")
     val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-    assert(sc.getRDDStorageInfo.size === 0)
+    assert(sc.getRDDStorageInfo.length === 0)
     rdd.collect()
     sc.listenerBus.waitUntilEmpty(10000)
-    assert(sc.getRDDStorageInfo.size === 1)
+    eventually(timeout(10.seconds), interval(100.milliseconds)) {
+      assert(sc.getRDDStorageInfo.length === 1)
+    }
     assert(sc.getRDDStorageInfo.head.isCached)
     assert(sc.getRDDStorageInfo.head.memSize > 0)
     assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY)
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 34e3868..9dd1132 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark
 // scalastyle:off
 import java.io.File
 
+import scala.annotation.tailrec
+
 import org.apache.log4j.{Appender, Level, Logger}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, 
FunSuite, Outcome}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Tests.IS_TESTING
@@ -54,6 +56,7 @@ import org.apache.spark.util.{AccumulatorContext, Utils}
 abstract class SparkFunSuite
   extends FunSuite
   with BeforeAndAfterAll
+  with BeforeAndAfterEach
   with ThreadAudit
   with Logging {
 // scalastyle:on
@@ -90,6 +93,47 @@ abstract class SparkFunSuite
   }
 
   /**
+   * Note: this method doesn't support `BeforeAndAfter`. You must use 
`BeforeAndAfterEach` to
+   * set up and tear down resources.
+   */
+  def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = {
+    test(s) {
+      retry(n) {
+        body
+      }
+    }
+  }
+
+  /**
+   * Note: this method doesn't support `BeforeAndAfter`. You must use 
`BeforeAndAfterEach` to
+   * set up and tear down resources.
+   */
+  def retry[T](n: Int)(body: => T): T = {
+    if (this.isInstanceOf[BeforeAndAfter]) {
+      throw new UnsupportedOperationException(
+        s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " +
+          s"Please use ${classOf[BeforeAndAfterEach]} instead.")
+    }
+    retry0(n, n)(body)
+  }
+
+  @tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = {
+    try body
+    catch { case e: Throwable =>
+      if (n > 0) {
+        logWarning(e.getMessage, e)
+        logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n")
+        // Reset state before re-attempting in order so that tests which use 
patterns like
+        // LocalSparkContext to clean up state can work correctly when retried.
+        afterEach()
+        beforeEach()
+        retry0(n-1, n0)(body)
+      }
+      else throw e
+    }
+  }
+
+  /**
    * Log the suite name and the test name before and after each test.
    *
    * Subclasses should never override this method. If they wish to run
diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
index c96db4e..f527bbe 100644
--- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.JobExecutionStatus._
 
 class StatusTrackerSuite extends SparkFunSuite with Matchers with 
LocalSparkContext {
 
-  test("basic status API usage") {
+  testRetry("basic status API usage") {
     sc = new SparkContext("local", "test", new SparkConf(false))
     val jobFuture = sc.parallelize(1 to 10000, 
2).map(identity).groupBy(identity).collectAsync()
     val jobId: Int = eventually(timeout(10.seconds)) {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 1a326a8..86575b1 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -34,7 +34,6 @@ import org.apache.hadoop.security.AccessControlException
 import org.json4s.jackson.JsonMethods._
 import org.mockito.ArgumentMatchers.{any, argThat}
 import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
-import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
 
@@ -52,16 +51,21 @@ import 
org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
 import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
 import org.apache.spark.util.logging.DriverLogger
 
-class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with 
Matchers with Logging {
+class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
 
   private var testDir: File = null
 
-  before {
+  override def beforeEach(): Unit = {
+    super.beforeEach()
     testDir = Utils.createTempDir(namePrefix = s"a b%20c+d")
   }
 
-  after {
-    Utils.deleteRecursively(testDir)
+  override def afterEach(): Unit = {
+    try {
+      Utils.deleteRecursively(testDir)
+    } finally {
+      super.afterEach()
+    }
   }
 
   /** Create a fake log file using the new log format used in Spark 1.3+ */
@@ -733,7 +737,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       provider.inSafeMode = false
       clock.setTime(10000)
 
-      eventually(timeout(1.second), interval(10.milliseconds)) {
+      eventually(timeout(3.second), interval(10.milliseconds)) {
         provider.getConfig().keys should not contain ("HDFS State")
       }
     } finally {
@@ -741,7 +745,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     }
   }
 
-  test("provider reports error after FS leaves safe mode") {
+  testRetry("provider reports error after FS leaves safe mode") {
     testDir.delete()
     val clock = new ManualClock()
     val provider = new SafeModeTestProvider(createTestConf(), clock)
@@ -751,7 +755,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       provider.inSafeMode = false
       clock.setTime(10000)
 
-      eventually(timeout(1.second), interval(10.milliseconds)) {
+      eventually(timeout(3.second), interval(10.milliseconds)) {
         verify(errorHandler).uncaughtException(any(), any())
       }
     } finally {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index 123f7f4..a6576e0 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -19,25 +19,23 @@ package org.apache.spark.scheduler
 
 import scala.collection.mutable
 
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-
 import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, 
TestUtils}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 
 /**
  * Unit tests for SparkListener that require a local cluster.
  */
-class SparkListenerWithClusterSuite extends SparkFunSuite with 
LocalSparkContext
-  with BeforeAndAfter with BeforeAndAfterAll {
+class SparkListenerWithClusterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
 
-  before {
+  override def beforeEach(): Unit = {
+    super.beforeEach()
     sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite")
   }
 
-  test("SparkListener sends executor added message") {
+  testRetry("SparkListener sends executor added message") {
     val listener = new SaveExecutorInfo
     sc.addSparkListener(listener)
 
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 384a5f4..c7a99bf 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -167,7 +167,7 @@ abstract class BaseYarnClusterSuite
 
     val handle = launcher.startApplication()
     try {
-      eventually(timeout(2.minutes), interval(1.second)) {
+      eventually(timeout(3.minutes), interval(1.second)) {
         assert(handle.getState().isFinal())
       }
     } finally {
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index b072202..c5142fa 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -205,7 +205,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       .startApplication()
 
     try {
-      eventually(timeout(30.seconds), interval(100.milliseconds)) {
+      eventually(timeout(3.minutes), interval(100.milliseconds)) {
         handle.getState() should be (SparkAppHandle.State.RUNNING)
       }
 
@@ -213,7 +213,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       handle.getAppId() should startWith ("application_")
       handle.stop()
 
-      eventually(timeout(30.seconds), interval(100.milliseconds)) {
+      eventually(timeout(3.minutes), interval(100.milliseconds)) {
         handle.getState() should be (SparkAppHandle.State.KILLED)
       }
     } finally {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index d845117..e3e5ddf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -502,7 +502,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with 
SharedSQLContext with
     }
 
     // Wait for listener to finish computing the metrics for the execution.
-    while (statusStore.executionsList().last.metricValues == null) {
+    while (statusStore.executionsList().isEmpty ||
+        statusStore.executionsList().last.metricValues == null) {
       Thread.sleep(100)
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 9235c6d..33b4c08 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -195,7 +195,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
   import testImplicits._
 
-  override val streamingTimeout = 20.seconds
+  override val streamingTimeout = 80.seconds
 
   /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
   private def createFileStreamSource(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 89ce636..52b8e47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -89,7 +89,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with 
TimeLimits with Be
   protected val defaultUseV2Sink = false
 
   /** How long to wait for an active stream to catch up when checking a 
result. */
-  val streamingTimeout = 10.seconds
+  val streamingTimeout = 60.seconds
 
   /** A trait for actions that can be performed while testing a streaming 
DataFrame. */
   trait StreamAction
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index b421809..b26d255 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -23,7 +23,6 @@ import scala.concurrent.Future
 import scala.util.Random
 import scala.util.control.NonFatal
 
-import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.Span
 import org.scalatest.time.SpanSugar._
@@ -35,21 +34,26 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.streaming.util.BlockingSource
 import org.apache.spark.util.Utils
 
-class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
+class StreamingQueryManagerSuite extends StreamTest {
 
   import AwaitTerminationTester._
   import testImplicits._
 
   override val streamingTimeout = 20.seconds
 
-  before {
+  override def beforeEach(): Unit = {
+    super.beforeEach()
     assert(spark.streams.active.isEmpty)
     spark.streams.resetTerminated()
   }
 
-  after {
-    assert(spark.streams.active.isEmpty)
-    spark.streams.resetTerminated()
+  override def afterEach(): Unit = {
+    try {
+      assert(spark.streams.active.isEmpty)
+      spark.streams.resetTerminated()
+    } finally {
+      super.afterEach()
+    }
   }
 
   testQuietly("listing") {
@@ -83,7 +87,7 @@ class StreamingQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
-  testQuietly("awaitAnyTermination without timeout and resetTerminated") {
+  testRetry("awaitAnyTermination without timeout and resetTerminated") {
     val datasets = Seq.fill(5)(makeDataset._2)
     withQueriesOn(datasets: _*) { queries =>
       require(queries.size === datasets.size)
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 48aa9e5..6b664b7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -122,7 +122,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits 
with Serializable {
     }
 
     // Verify that stopping actually stops the thread
-    failAfter(100.milliseconds) {
+    failAfter(1.second) {
       receiver.stop("test")
       assert(receiver.isStopped)
       assert(!receiver.otherThread.isAlive)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to