Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d023c6c45 -> 5b9eb4212


[SPARK-17693][SQL][BACKPORT-2.0] Fixed Insert Failure To Data Source Tables 
when the Schema has the Comment Field

### What changes were proposed in this pull request?
**This PR is to backport the fix https://github.com/apache/spark/pull/15615 to 
2.0.**

``` SQL
CREATE TABLE tab1(col1 int COMMENT 'a', col2 int) USING parquet
INSERT INTO TABLE tab1 SELECT 1, 2
```

The insert attempt will fail if the target table has a column with comments. 
The error is strange to the external users:

```
assertion failed: No plan for InsertIntoTable Relation[col1#15,col2#16] 
parquet, false, false
+- Project [1 AS col1#19, 2 AS col2#20]
   +- OneRowRelation$
```

This PR is to fix the above bug by checking the metadata when comparing the 
schema between the table and the query. If not matched, we also copy the 
metadata. This is an alternative to https://github.com/apache/spark/pull/15266
### How was this patch tested?
Added a test case

Author: gatorsmile <[email protected]>

Closes #15782 from 
gatorsmile/insertDataSourceTableWithCommentSolutionBackPort2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b9eb421
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b9eb421
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b9eb421

Branch: refs/heads/branch-2.0
Commit: 5b9eb421208843f1fe0bb876e4781c0ec427c261
Parents: d023c6c
Author: gatorsmile <[email protected]>
Authored: Sat Nov 5 11:45:16 2016 +0100
Committer: Herman van Hovell <[email protected]>
Committed: Sat Nov 5 11:45:16 2016 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/rules.scala | 11 +++--
 .../apache/spark/sql/sources/InsertSuite.scala  | 42 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b9eb421/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 27420d5..c21fa8d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -127,16 +127,21 @@ case class PreprocessTableInsertion(conf: SQLConf) 
extends Rule[LogicalPlan] {
     }
   }
 
-  // TODO: do we really need to rename?
   def castAndRenameChildOutput(
       insert: InsertIntoTable,
       expectedOutput: Seq[Attribute]): InsertIntoTable = {
     val newChildOutput = expectedOutput.zip(insert.child.output).map {
       case (expected, actual) =>
-        if (expected.dataType.sameType(actual.dataType) && expected.name == 
actual.name) {
+        if (expected.dataType.sameType(actual.dataType) &&
+          expected.name == actual.name &&
+          expected.metadata == actual.metadata) {
           actual
         } else {
-          Alias(Cast(actual, expected.dataType), expected.name)()
+          // Renaming is needed for handling the following cases like
+          // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 
SELECT 1, 2
+          // 2) Target tables have column metadata
+          Alias(Cast(actual, expected.dataType), expected.name)(
+            explicitMetadata = Option(expected.metadata))
         }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b9eb421/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 6454d71..bbb8b43 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -229,6 +229,48 @@ class InsertSuite extends DataSourceTest with 
SharedSQLContext {
 //    assertCached(sql("SELECT * FROM jsonTable"), 0)
   }
 
+  test("INSERT INTO TABLE with Comment in columns") {
+    val tabName = "tab1"
+    withTable(tabName) {
+      sql(
+        s"""
+           |CREATE TABLE $tabName(col1 int COMMENT 'a', col2 int)
+           |USING parquet
+         """.stripMargin)
+      sql(s"INSERT INTO TABLE $tabName SELECT 1, 2")
+
+      checkAnswer(
+        sql(s"SELECT col1, col2 FROM $tabName"),
+        Row(1, 2) :: Nil
+      )
+    }
+  }
+
+  test("INSERT INTO TABLE - complex type but different names") {
+    val tab1 = "tab1"
+    val tab2 = "tab2"
+    withTable(tab1, tab2) {
+      sql(
+        s"""
+           |CREATE TABLE $tab1 (s struct<a: string, b: string>)
+           |USING parquet
+         """.stripMargin)
+      sql(s"INSERT INTO TABLE $tab1 SELECT 
named_struct('col1','1','col2','2')")
+
+      sql(
+        s"""
+           |CREATE TABLE $tab2 (p struct<c: string, d: string>)
+           |USING parquet
+         """.stripMargin)
+      sql(s"INSERT INTO TABLE $tab2 SELECT * FROM $tab1")
+
+      checkAnswer(
+        spark.table(tab1),
+        spark.table(tab2)
+      )
+    }
+  }
+
   test("it's not allowed to insert into a relation that is not an 
InsertableRelation") {
     sql(
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to