Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ab9a304a7 -> 774014250


[SPARK-15910][SQL] Check schema consistency when using Kryo encoder to convert 
DataFrame to Dataset

## What changes were proposed in this pull request?

This PR enforces schema check when converting DataFrame to Dataset using Kryo 
encoder. For example.

**Before the change:**

Schema is NOT checked when converting DataFrame to Dataset using kryo encoder.
```
scala> case class B(b: Int)
scala> implicit val encoder = Encoders.kryo[B]
scala> val df = Seq((1)).toDF("b")
scala> val ds = df.as[B] // Schema compatibility is NOT checked
```

**After the change:**
Report AnalysisException since the schema is NOT compatible.
```
scala> val ds = Seq((1)).toDF("b").as[B]
org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`b` AS BINARY)' 
due to data type mismatch: cannot cast IntegerType to BinaryType;
...
```

## How was this patch tested?

Unit test.

Author: Sean Zhong <[email protected]>

Closes #13632 from clockfly/spark-15910.

(cherry picked from commit 7b9071eeaa62fd9a51d9e94cfd479224b8341517)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77401425
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77401425
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77401425

Branch: refs/heads/branch-2.0
Commit: 774014250eb9e73f05387d73f48ee7c89e9a89c3
Parents: ab9a304
Author: Sean Zhong <[email protected]>
Authored: Mon Jun 13 17:43:55 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Mon Jun 13 17:44:04 2016 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/Encoders.scala      | 6 ++++--
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala  | 9 +++++++++
 2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/77401425/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index 673c587..e72f67c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -25,8 +25,8 @@ import scala.reflect.runtime.universe.TypeTag
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast}
 import 
org.apache.spark.sql.catalyst.expressions.objects.{DecodeUsingSerializer, 
EncodeUsingSerializer}
-import org.apache.spark.sql.catalyst.expressions.BoundReference
 import org.apache.spark.sql.types._
 
 /**
@@ -209,7 +209,9 @@ object Encoders {
           BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), 
kryo = useKryo)),
       deserializer =
         DecodeUsingSerializer[T](
-          GetColumnByOrdinal(0, BinaryType), classTag[T], kryo = useKryo),
+          Cast(GetColumnByOrdinal(0, BinaryType), BinaryType),
+          classTag[T],
+          kryo = useKryo),
       clsTag = classTag[T]
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/77401425/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 96d85f1..f02a314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -453,6 +453,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
         (KryoData(2), KryoData(2))))
   }
 
+  test("Kryo encoder: check the schema mismatch when converting DataFrame to 
Dataset") {
+    implicit val kryoEncoder = Encoders.kryo[KryoData]
+    val df = Seq((1)).toDF("a")
+    val e = intercept[AnalysisException] {
+      df.as[KryoData]
+    }.message
+    assert(e.contains("cannot cast IntegerType to BinaryType"))
+  }
+
   test("Java encoder") {
     implicit val kryoEncoder = Encoders.javaSerialization[JavaData]
     val ds = Seq(JavaData(1), JavaData(2)).toDS()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to