Repository: spark
Updated Branches:
  refs/heads/master 35e8ab939 -> 00a2911c5


[SPARK-10540] Fixes flaky all-data-type test

This PR breaks the original test case into multiple ones (one test case for 
each data type). In this way, test failure output can be much more readable.

Within each test case, we build a table with two columns, one of them is for 
the data type to test, the other is an "index" column, which is used to sort 
the DataFrame and workaround [SPARK-10591] [1]

[1]: https://issues.apache.org/jira/browse/SPARK-10591

Author: Cheng Lian <[email protected]>

Closes #8768 from liancheng/spark-10540/test-all-data-types.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00a2911c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00a2911c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00a2911c

Branch: refs/heads/master
Commit: 00a2911c5bea67a1a4796fb1d6fd5d0a8ee79001
Parents: 35e8ab9
Author: Cheng Lian <[email protected]>
Authored: Fri Sep 18 12:19:08 2015 -0700
Committer: Yin Huai <[email protected]>
Committed: Fri Sep 18 12:19:08 2015 -0700

----------------------------------------------------------------------
 .../sql/sources/hadoopFsRelationSuites.scala    | 109 ++++++++-----------
 1 file changed, 43 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00a2911c/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 8ffcef8..d750493 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -100,80 +100,57 @@ abstract class HadoopFsRelationTest extends QueryTest 
with SQLTestUtils with Tes
     }
   }
 
-  ignore("test all data types") {
-    withTempPath { file =>
-      // Create the schema.
-      val struct =
-        StructType(
-          StructField("f1", FloatType, true) ::
-            StructField("f2", ArrayType(BooleanType), true) :: Nil)
-      // TODO: add CalendarIntervalType to here once we can save it out.
-      val dataTypes =
-        Seq(
-          StringType, BinaryType, NullType, BooleanType,
-          ByteType, ShortType, IntegerType, LongType,
-          FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
-          DateType, TimestampType,
-          ArrayType(IntegerType), MapType(StringType, LongType), struct,
-          new MyDenseVectorUDT())
-      val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
-        StructField(s"col$index", dataType, nullable = true)
-      }
-      val schema = StructType(fields)
-
-      // Generate data at the driver side. We need to materialize the data 
first and then
-      // create RDD.
-      val maybeDataGenerator =
-        RandomDataGenerator.forType(
-          dataType = schema,
+  private val supportedDataTypes = Seq(
+    StringType, BinaryType,
+    NullType, BooleanType,
+    ByteType, ShortType, IntegerType, LongType,
+    FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+    DateType, TimestampType,
+    ArrayType(IntegerType),
+    MapType(StringType, LongType),
+    new StructType()
+      .add("f1", FloatType, nullable = true)
+      .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
+    new MyDenseVectorUDT()
+  ).filter(supportsDataType)
+
+  for (dataType <- supportedDataTypes) {
+    test(s"test all data types - $dataType") {
+      withTempPath { file =>
+        val path = file.getCanonicalPath
+
+        val dataGenerator = RandomDataGenerator.forType(
+          dataType = dataType,
           nullable = true,
-          seed = Some(System.nanoTime()))
-      val dataGenerator =
-        maybeDataGenerator
-          .getOrElse(fail(s"Failed to create data generator for schema 
$schema"))
-      val data = (1 to 10).map { i =>
-        dataGenerator.apply() match {
-          case row: Row => row
-          case null => Row.fromSeq(Seq.fill(schema.length)(null))
-          case other =>
-            fail(s"Row or null is expected to be generated, " +
-              s"but a ${other.getClass.getCanonicalName} is generated.")
+          seed = Some(System.nanoTime())
+        ).getOrElse {
+          fail(s"Failed to create data generator for schema $dataType")
         }
-      }
 
-      // Create a DF for the schema with random data.
-      val rdd = sqlContext.sparkContext.parallelize(data, 10)
-      val df = sqlContext.createDataFrame(rdd, schema)
+        // Create a DF for the schema with random data. The index field is 
used to sort the
+        // DataFrame.  This is a workaround for SPARK-10591.
+        val schema = new StructType()
+          .add("index", IntegerType, nullable = false)
+          .add("col", dataType, nullable = true)
+        val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => 
Row(i, dataGenerator())))
+        val df = sqlContext.createDataFrame(rdd, 
schema).orderBy("index").coalesce(1)
 
-      // All columns that have supported data types of this source.
-      val supportedColumns = schema.fields.collect {
-        case StructField(name, dataType, _, _) if supportsDataType(dataType) 
=> name
-      }
-      val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
-
-      val dfToBeSaved = df.selectExpr(selectedColumns: _*)
-
-      // Save the data out.
-      dfToBeSaved
-        .write
-        .format(dataSourceName)
-        .option("dataSchema", dfToBeSaved.schema.json) // This option is just 
used by tests.
-        .save(file.getCanonicalPath)
+        df.write
+          .mode("overwrite")
+          .format(dataSourceName)
+          .option("dataSchema", df.schema.json)
+          .save(path)
 
-      val loadedDF =
-        sqlContext
+        val loadedDF = sqlContext
           .read
           .format(dataSourceName)
-          .schema(dfToBeSaved.schema)
-          .option("dataSchema", dfToBeSaved.schema.json) // This option is 
just used by tests.
-          .load(file.getCanonicalPath)
-          .selectExpr(selectedColumns: _*)
+          .option("dataSchema", df.schema.json)
+          .schema(df.schema)
+          .load(path)
+          .orderBy("index")
 
-      // Read the data back.
-      checkAnswer(
-        loadedDF,
-        dfToBeSaved
-      )
+        checkAnswer(loadedDF, df)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to