This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cd4a284 [SPARK-27460][FOLLOW-UP][TESTS] Fix flaky tests cd4a284 is described below commit cd4a284030e5142bdb405ff5b71735ac8cee2dde Author: gatorsmile <gatorsm...@gmail.com> AuthorDate: Wed Apr 24 17:36:29 2019 +0800 [SPARK-27460][FOLLOW-UP][TESTS] Fix flaky tests ## What changes were proposed in this pull request? This patch makes several test flakiness fixes. ## How was this patch tested? N/A Closes #24434 from gatorsmile/fixFlakyTest. Lead-authored-by: gatorsmile <gatorsm...@gmail.com> Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/ExecutorAllocationManagerSuite.scala | 18 +++++---- .../org/apache/spark/SparkContextInfoSuite.scala | 9 ++++- .../scala/org/apache/spark/SparkFunSuite.scala | 46 +++++++++++++++++++++- .../org/apache/spark/StatusTrackerSuite.scala | 2 +- .../deploy/history/FsHistoryProviderSuite.scala | 20 ++++++---- .../scheduler/SparkListenerWithClusterSuite.scala | 10 ++--- .../spark/deploy/yarn/BaseYarnClusterSuite.scala | 2 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 4 +- .../execution/ui/SQLAppStatusListenerSuite.scala | 3 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../apache/spark/sql/streaming/StreamTest.scala | 2 +- .../sql/streaming/StreamingQueryManagerSuite.scala | 18 +++++---- .../org/apache/spark/streaming/ReceiverSuite.scala | 2 +- 13 files changed, 99 insertions(+), 39 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 12c8a9d..9c30124 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito.{mock, never, verify, when} -import org.scalatest.{BeforeAndAfter, PrivateMethodTester} +import org.scalatest.PrivateMethodTester import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config @@ -38,20 +38,24 @@ import org.apache.spark.util.ManualClock */ class ExecutorAllocationManagerSuite extends SparkFunSuite - with LocalSparkContext - with BeforeAndAfter { + with LocalSparkContext { import ExecutorAllocationManager._ import ExecutorAllocationManagerSuite._ private val contexts = new mutable.ListBuffer[SparkContext]() - before { + override def beforeEach(): Unit = { + super.beforeEach() contexts.clear() } - after { - contexts.foreach(_.stop()) + override def afterEach(): Unit = { + try { + contexts.foreach(_.stop()) + } finally { + super.afterEach() + } } private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = { @@ -282,7 +286,7 @@ class ExecutorAllocationManagerSuite assert(totalRunningTasks(manager) === 0) } - test("cancel pending executors when no longer needed") { + testRetry("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5))) diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index a57afdf..536b4ae 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark +import scala.concurrent.duration._ + import org.scalatest.Assertions +import org.scalatest.concurrent.Eventually._ import org.apache.spark.storage.StorageLevel @@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext { test("getRDDStorageInfo only reports on RDDs that actually persist data") { sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() - assert(sc.getRDDStorageInfo.size === 0) + assert(sc.getRDDStorageInfo.length === 0) rdd.collect() sc.listenerBus.waitUntilEmpty(10000) - assert(sc.getRDDStorageInfo.size === 1) + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert(sc.getRDDStorageInfo.length === 1) + } assert(sc.getRDDStorageInfo.head.isCached) assert(sc.getRDDStorageInfo.head.memSize > 0) assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY) diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 34e3868..9dd1132 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -20,8 +20,10 @@ package org.apache.spark // scalastyle:off import java.io.File +import scala.annotation.tailrec + import org.apache.log4j.{Appender, Level, Logger} -import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Outcome} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Tests.IS_TESTING @@ -54,6 +56,7 @@ import org.apache.spark.util.{AccumulatorContext, Utils} abstract class SparkFunSuite extends FunSuite with BeforeAndAfterAll + with BeforeAndAfterEach with ThreadAudit with Logging { // scalastyle:on @@ -90,6 +93,47 @@ abstract class SparkFunSuite } /** + * Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to + * set up and tear down resources. + */ + def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = { + test(s) { + retry(n) { + body + } + } + } + + /** + * Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to + * set up and tear down resources. + */ + def retry[T](n: Int)(body: => T): T = { + if (this.isInstanceOf[BeforeAndAfter]) { + throw new UnsupportedOperationException( + s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " + + s"Please use ${classOf[BeforeAndAfterEach]} instead.") + } + retry0(n, n)(body) + } + + @tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = { + try body + catch { case e: Throwable => + if (n > 0) { + logWarning(e.getMessage, e) + logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n") + // Reset state before re-attempting in order so that tests which use patterns like + // LocalSparkContext to clean up state can work correctly when retried. + afterEach() + beforeEach() + retry0(n-1, n0)(body) + } + else throw e + } + } + + /** * Log the suite name and the test name before and after each test. * * Subclasses should never override this method. If they wish to run diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index c96db4e..f527bbe 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.JobExecutionStatus._ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkContext { - test("basic status API usage") { + testRetry("basic status API usage") { sc = new SparkContext("local", "test", new SparkConf(false)) val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() val jobId: Int = eventually(timeout(10.seconds)) { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 1a326a8..86575b1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -34,7 +34,6 @@ import org.apache.hadoop.security.AccessControlException import org.json4s.jackson.JsonMethods._ import org.mockito.ArgumentMatchers.{any, argThat} import org.mockito.Mockito.{doThrow, mock, spy, verify, when} -import org.scalatest.BeforeAndAfter import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -52,16 +51,21 @@ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} import org.apache.spark.util.logging.DriverLogger -class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { +class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private var testDir: File = null - before { + override def beforeEach(): Unit = { + super.beforeEach() testDir = Utils.createTempDir(namePrefix = s"a b%20c+d") } - after { - Utils.deleteRecursively(testDir) + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(testDir) + } finally { + super.afterEach() + } } /** Create a fake log file using the new log format used in Spark 1.3+ */ @@ -733,7 +737,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1.second), interval(10.milliseconds)) { + eventually(timeout(3.second), interval(10.milliseconds)) { provider.getConfig().keys should not contain ("HDFS State") } } finally { @@ -741,7 +745,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("provider reports error after FS leaves safe mode") { + testRetry("provider reports error after FS leaves safe mode") { testDir.delete() val clock = new ManualClock() val provider = new SafeModeTestProvider(createTestConf(), clock) @@ -751,7 +755,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1.second), interval(10.milliseconds)) { + eventually(timeout(3.second), interval(10.milliseconds)) { verify(errorHandler).uncaughtException(any(), any()) } } finally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 123f7f4..a6576e0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -19,25 +19,23 @@ package org.apache.spark.scheduler import scala.collection.mutable -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} - import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo /** * Unit tests for SparkListener that require a local cluster. */ -class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext - with BeforeAndAfter with BeforeAndAfterAll { +class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - before { + override def beforeEach(): Unit = { + super.beforeEach() sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite") } - test("SparkListener sends executor added message") { + testRetry("SparkListener sends executor added message") { val listener = new SaveExecutorInfo sc.addSparkListener(listener) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 384a5f4..c7a99bf 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -167,7 +167,7 @@ abstract class BaseYarnClusterSuite val handle = launcher.startApplication() try { - eventually(timeout(2.minutes), interval(1.second)) { + eventually(timeout(3.minutes), interval(1.second)) { assert(handle.getState().isFinal()) } } finally { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index b072202..c5142fa 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -205,7 +205,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { .startApplication() try { - eventually(timeout(30.seconds), interval(100.milliseconds)) { + eventually(timeout(3.minutes), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.RUNNING) } @@ -213,7 +213,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { handle.getAppId() should startWith ("application_") handle.stop() - eventually(timeout(30.seconds), interval(100.milliseconds)) { + eventually(timeout(3.minutes), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index d845117..e3e5ddf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -502,7 +502,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with } // Wait for listener to finish computing the metrics for the execution. - while (statusStore.executionsList().last.metricValues == null) { + while (statusStore.executionsList().isEmpty || + statusStore.executionsList().last.metricValues == null) { Thread.sleep(100) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 9235c6d..33b4c08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -195,7 +195,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { import testImplicits._ - override val streamingTimeout = 20.seconds + override val streamingTimeout = 80.seconds /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ private def createFileStreamSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 89ce636..52b8e47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -89,7 +89,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be protected val defaultUseV2Sink = false /** How long to wait for an active stream to catch up when checking a result. */ - val streamingTimeout = 10.seconds + val streamingTimeout = 60.seconds /** A trait for actions that can be performed while testing a streaming DataFrame. */ trait StreamAction diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index b421809..b26d255 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -23,7 +23,6 @@ import scala.concurrent.Future import scala.util.Random import scala.util.control.NonFatal -import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ @@ -35,21 +34,26 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils -class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { +class StreamingQueryManagerSuite extends StreamTest { import AwaitTerminationTester._ import testImplicits._ override val streamingTimeout = 20.seconds - before { + override def beforeEach(): Unit = { + super.beforeEach() assert(spark.streams.active.isEmpty) spark.streams.resetTerminated() } - after { - assert(spark.streams.active.isEmpty) - spark.streams.resetTerminated() + override def afterEach(): Unit = { + try { + assert(spark.streams.active.isEmpty) + spark.streams.resetTerminated() + } finally { + super.afterEach() + } } testQuietly("listing") { @@ -83,7 +87,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } - testQuietly("awaitAnyTermination without timeout and resetTerminated") { + testRetry("awaitAnyTermination without timeout and resetTerminated") { val datasets = Seq.fill(5)(makeDataset._2) withQueriesOn(datasets: _*) { queries => require(queries.size === datasets.size) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 48aa9e5..6b664b7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -122,7 +122,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { } // Verify that stopping actually stops the thread - failAfter(100.milliseconds) { + failAfter(1.second) { receiver.stop("test") assert(receiver.isStopped) assert(!receiver.otherThread.isAlive) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org