Repository: spark
Updated Branches:
refs/heads/branch-2.0 ab9a304a7 -> 774014250
[SPARK-15910][SQL] Check schema consistency when using Kryo encoder to convert
DataFrame to Dataset
## What changes were proposed in this pull request?
This PR enforces schema check when converting DataFrame to Dataset using Kryo
encoder. For example.
**Before the change:**
Schema is NOT checked when converting DataFrame to Dataset using kryo encoder.
```
scala> case class B(b: Int)
scala> implicit val encoder = Encoders.kryo[B]
scala> val df = Seq((1)).toDF("b")
scala> val ds = df.as[B] // Schema compatibility is NOT checked
```
**After the change:**
Report AnalysisException since the schema is NOT compatible.
```
scala> val ds = Seq((1)).toDF("b").as[B]
org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`b` AS BINARY)'
due to data type mismatch: cannot cast IntegerType to BinaryType;
...
```
## How was this patch tested?
Unit test.
Author: Sean Zhong <[email protected]>
Closes #13632 from clockfly/spark-15910.
(cherry picked from commit 7b9071eeaa62fd9a51d9e94cfd479224b8341517)
Signed-off-by: Wenchen Fan <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77401425
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77401425
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77401425
Branch: refs/heads/branch-2.0
Commit: 774014250eb9e73f05387d73f48ee7c89e9a89c3
Parents: ab9a304
Author: Sean Zhong <[email protected]>
Authored: Mon Jun 13 17:43:55 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Mon Jun 13 17:44:04 2016 -0700
----------------------------------------------------------------------
.../src/main/scala/org/apache/spark/sql/Encoders.scala | 6 ++++--
.../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 9 +++++++++
2 files changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/77401425/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
index 673c587..e72f67c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -25,8 +25,8 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast}
import
org.apache.spark.sql.catalyst.expressions.objects.{DecodeUsingSerializer,
EncodeUsingSerializer}
-import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.types._
/**
@@ -209,7 +209,9 @@ object Encoders {
BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true),
kryo = useKryo)),
deserializer =
DecodeUsingSerializer[T](
- GetColumnByOrdinal(0, BinaryType), classTag[T], kryo = useKryo),
+ Cast(GetColumnByOrdinal(0, BinaryType), BinaryType),
+ classTag[T],
+ kryo = useKryo),
clsTag = classTag[T]
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/77401425/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 96d85f1..f02a314 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -453,6 +453,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext
{
(KryoData(2), KryoData(2))))
}
+ test("Kryo encoder: check the schema mismatch when converting DataFrame to
Dataset") {
+ implicit val kryoEncoder = Encoders.kryo[KryoData]
+ val df = Seq((1)).toDF("a")
+ val e = intercept[AnalysisException] {
+ df.as[KryoData]
+ }.message
+ assert(e.contains("cannot cast IntegerType to BinaryType"))
+ }
+
test("Java encoder") {
implicit val kryoEncoder = Encoders.javaSerialization[JavaData]
val ds = Seq(JavaData(1), JavaData(2)).toDS()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]