Repository: spark
Updated Branches:
  refs/heads/branch-2.1 fb0351a3f -> ba505805d


[SPARK-20407][TESTS][BACKPORT-2.1] ParquetQuerySuite 'Enabling/disabling 
ignoreCorruptFiles' flaky test

## What changes were proposed in this pull request?

SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside 
eventually.
SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the 
directory.

## How was this patch tested?
New test but marked as ignored because it takes 30s. Can be unignored for 
review.

Author: Bogdan Raducanu <[email protected]>

Closes #17720 from bogdanrdc/SPARK-20407-BACKPORT2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba505805
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba505805
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba505805

Branch: refs/heads/branch-2.1
Commit: ba505805dcf17d7964ec9df7e76489bfc162949a
Parents: fb0351a
Author: Bogdan Raducanu <[email protected]>
Authored: Sat Apr 22 09:58:07 2017 -0700
Committer: Xiao Li <[email protected]>
Committed: Sat Apr 22 09:58:07 2017 -0700

----------------------------------------------------------------------
 .../datasources/parquet/ParquetQuerySuite.scala | 35 +++++++++++++++++++-
 .../apache/spark/sql/test/SQLTestUtils.scala    | 19 +++++++++--
 .../spark/sql/test/SharedSQLContext.scala       | 11 ++++--
 3 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba505805/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 6132376..6033c66 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
-import org.apache.spark.SparkException
+import org.apache.spark.{DebugFilesystem, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -242,6 +242,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
     }
   }
 
+  /**
+   * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a 
loop
+   * to increase the chance of failure
+    */
+  ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling 
ignoreCorruptFiles' flaky test") {
+    def testIgnoreCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, 
"first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, 
"second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, 
"third").toString)
+        val df = spark.read.parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(
+          df,
+          Seq(Row(0), Row(1)))
+      }
+    }
+
+    for (i <- 1 to 100) {
+      DebugFilesystem.clearOpenStreams()
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val exception = intercept[SparkException] {
+          testIgnoreCorruptFiles()
+        }
+        assert(exception.getMessage().contains("is not a Parquet file"))
+      }
+      DebugFilesystem.assertNoOpenStreams()
+    }
+  }
+
   test("SPARK-8990 DataFrameReader.parquet() should respect user specified 
options") {
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath

http://git-wip-us.apache.org/repos/asf/spark/blob/ba505805/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index d4afb9d..24ba0f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.test
 import java.io.File
 import java.util.UUID
 
+import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.util.Try
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
@@ -48,7 +50,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
  * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in 
the same JVM.
  */
 private[sql] trait SQLTestUtils
-  extends SparkFunSuite
+  extends SparkFunSuite with Eventually
   with BeforeAndAfterAll
   with SQLTestData { self =>
 
@@ -123,6 +125,15 @@ private[sql] trait SQLTestUtils
   }
 
   /**
+   * Waits for all tasks on all executors to be finished.
+   */
+  protected def waitForTasksToFinish(): Unit = {
+    eventually(timeout(10.seconds)) {
+      assert(spark.sparkContext.statusTracker
+        .getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+    }
+  }
+  /**
    * Creates a temporary directory, which is then passed to `f` and will be 
deleted after `f`
    * returns.
    *
@@ -130,7 +141,11 @@ private[sql] trait SQLTestUtils
    */
   protected def withTempDir(f: File => Unit): Unit = {
     val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally Utils.deleteRecursively(dir)
+    try f(dir) finally {
+      // wait for all tasks to finish before deleting files
+      waitForTasksToFinish()
+      Utils.deleteRecursively(dir)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ba505805/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 2239f10..243845d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.test
 
+import scala.concurrent.duration._
+
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
@@ -26,7 +29,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
 /**
  * Helper trait for SQL test suites where all tests share a single 
[[TestSparkSession]].
  */
-trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
+trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with 
Eventually {
 
   protected val sparkConf = new SparkConf()
 
@@ -86,6 +89,10 @@ trait SharedSQLContext extends SQLTestUtils with 
BeforeAndAfterEach {
 
   protected override def afterEach(): Unit = {
     super.afterEach()
-    DebugFilesystem.assertNoOpenStreams()
+    // files can be closed from other threads, so wait a bit
+    // normally this doesn't take more than 1s
+    eventually(timeout(10.seconds)) {
+      DebugFilesystem.assertNoOpenStreams()
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to