Repository: spark Updated Branches: refs/heads/branch-2.0 d023c6c45 -> 5b9eb4212
[SPARK-17693][SQL][BACKPORT-2.0] Fixed Insert Failure To Data Source Tables when the Schema has the Comment Field ### What changes were proposed in this pull request? **This PR is to backport the fix https://github.com/apache/spark/pull/15615 to 2.0.** ``` SQL CREATE TABLE tab1(col1 int COMMENT 'a', col2 int) USING parquet INSERT INTO TABLE tab1 SELECT 1, 2 ``` The insert attempt will fail if the target table has a column with comments. The error is strange to the external users: ``` assertion failed: No plan for InsertIntoTable Relation[col1#15,col2#16] parquet, false, false +- Project [1 AS col1#19, 2 AS col2#20] +- OneRowRelation$ ``` This PR is to fix the above bug by checking the metadata when comparing the schema between the table and the query. If not matched, we also copy the metadata. This is an alternative to https://github.com/apache/spark/pull/15266 ### How was this patch tested? Added a test case Author: gatorsmile <[email protected]> Closes #15782 from gatorsmile/insertDataSourceTableWithCommentSolutionBackPort2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b9eb421 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b9eb421 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b9eb421 Branch: refs/heads/branch-2.0 Commit: 5b9eb421208843f1fe0bb876e4781c0ec427c261 Parents: d023c6c Author: gatorsmile <[email protected]> Authored: Sat Nov 5 11:45:16 2016 +0100 Committer: Herman van Hovell <[email protected]> Committed: Sat Nov 5 11:45:16 2016 +0100 ---------------------------------------------------------------------- .../spark/sql/execution/datasources/rules.scala | 11 +++-- .../apache/spark/sql/sources/InsertSuite.scala | 42 ++++++++++++++++++++ 2 files changed, 50 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5b9eb421/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 27420d5..c21fa8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -127,16 +127,21 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { } } - // TODO: do we really need to rename? def castAndRenameChildOutput( insert: InsertIntoTable, expectedOutput: Seq[Attribute]): InsertIntoTable = { val newChildOutput = expectedOutput.zip(insert.child.output).map { case (expected, actual) => - if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name) { + if (expected.dataType.sameType(actual.dataType) && + expected.name == actual.name && + expected.metadata == actual.metadata) { actual } else { - Alias(Cast(actual, expected.dataType), expected.name)() + // Renaming is needed for handling the following cases like + // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2 + // 2) Target tables have column metadata + Alias(Cast(actual, expected.dataType), expected.name)( + explicitMetadata = Option(expected.metadata)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/5b9eb421/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 6454d71..bbb8b43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -229,6 +229,48 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { // assertCached(sql("SELECT * FROM jsonTable"), 0) } + test("INSERT INTO TABLE with Comment in columns") { + val tabName = "tab1" + withTable(tabName) { + sql( + s""" + |CREATE TABLE $tabName(col1 int COMMENT 'a', col2 int) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tabName SELECT 1, 2") + + checkAnswer( + sql(s"SELECT col1, col2 FROM $tabName"), + Row(1, 2) :: Nil + ) + } + } + + test("INSERT INTO TABLE - complex type but different names") { + val tab1 = "tab1" + val tab2 = "tab2" + withTable(tab1, tab2) { + sql( + s""" + |CREATE TABLE $tab1 (s struct<a: string, b: string>) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tab1 SELECT named_struct('col1','1','col2','2')") + + sql( + s""" + |CREATE TABLE $tab2 (p struct<c: string, d: string>) + |USING parquet + """.stripMargin) + sql(s"INSERT INTO TABLE $tab2 SELECT * FROM $tab1") + + checkAnswer( + spark.table(tab1), + spark.table(tab2) + ) + } + } + test("it's not allowed to insert into a relation that is not an InsertableRelation") { sql( """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
