Repository: spark
Updated Branches:
  refs/heads/master 37c617e4f -> 03c7b7c4b


[SPARK-15315][SQL] Adding error check to  the CSV datasource writer for 
unsupported complex data types.

## What changes were proposed in this pull request?

Adds error handling to the CSV writer  for unsupported complex data types.  
Currently garbage gets written to the output csv files if the data frame schema 
has complex data types.

## How was this patch tested?

Added new unit test case.

Author: sureshthalamati <[email protected]>

Closes #13105 from sureshthalamati/csv_complex_types_SPARK-15315.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03c7b7c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03c7b7c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03c7b7c4

Branch: refs/heads/master
Commit: 03c7b7c4b9374f0cb6a29aeaf495bd21c2563de4
Parents: 37c617e
Author: sureshthalamati <[email protected]>
Authored: Mon May 23 17:15:19 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Mon May 23 17:15:19 2016 -0700

----------------------------------------------------------------------
 .../datasources/csv/DefaultSource.scala         | 14 ++++++++++++-
 .../execution/datasources/csv/CSVSuite.scala    | 22 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03c7b7c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index f47ed76..057bde1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -86,6 +86,7 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
+    verifySchema(dataSchema)
     val conf = job.getConfiguration
     val csvOptions = new CSVOptions(options)
     csvOptions.compressionCodec.foreach { codec =>
@@ -172,4 +173,15 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
         .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, 
pair._2.getLength, charset)))
     }
   }
+
+  private def verifySchema(schema: StructType): Unit = {
+    schema.foreach { field =>
+      field.dataType match {
+        case _: ArrayType | _: MapType | _: StructType =>
+          throw new UnsupportedOperationException(
+            s"CSV data source does not support ${field.dataType.simpleString} 
data type.")
+        case _ =>
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/03c7b7c4/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 27d6dc9..bae2907 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.test.{SharedSQLContext, 
SQLTestUtils}
 import org.apache.spark.sql.types._
 
 class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
+  import testImplicits._
+
   private val carsFile = "cars.csv"
   private val carsMalformedFile = "cars-malformed.csv"
   private val carsFile8859 = "cars_iso-8859-1.csv"
@@ -582,4 +584,24 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
 
     assert(numbers.count() == 8)
   }
+
+  test("error handling for unsupported data types.") {
+    withTempDir { dir =>
+      val csvDir = new File(dir, "csv").getCanonicalPath
+      var msg = intercept[UnsupportedOperationException] {
+        Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, 
b)").write.csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support 
struct<a:int,b:string> data type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support map<string,int> 
data type"))
+
+      msg = intercept[UnsupportedOperationException] {
+        Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", 
"brands").write.csv(csvDir)
+      }.getMessage
+      assert(msg.contains("CSV data source does not support array<string> data 
type"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to