This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 00f66994c80 [SPARK-44577][SQL] Fix INSERT BY NAME returns nonsensical
error message
00f66994c80 is described below
commit 00f66994c802faf9ccc0d40ed4f6ff32992ba00f
Author: Jia Fan <[email protected]>
AuthorDate: Fri Sep 1 20:27:17 2023 +0800
[SPARK-44577][SQL] Fix INSERT BY NAME returns nonsensical error message
### What changes were proposed in this pull request?
Fix INSERT BY NAME returns nonsensical error message on v1 datasource.
eg:
```scala
CREATE TABLE bug(c1 INT);
INSERT INTO bug BY NAME SELECT 1 AS c2;
==> Multi-part identifier cannot be empty.
```
After PR:
```scala
[INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA] Cannot write incompatible
data for the table `spark_catalog`.`default`.`bug`: Cannot find data for the
output column `c1`.
```
Also fixed the same issue when throwing other INCOMPATIBLE_DATA_FOR_TABLE
type errors
### Why are the changes needed?
Fix the error msg nonsensical.
### Does this PR introduce _any_ user-facing change?
Yes, the error msg in v1 insert by name will be changed.
### How was this patch tested?
add new test.
Closes #42220 from Hisoka-X/SPARK-44577_insert_by_name_bug_fix.
Authored-by: Jia Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-classes.json | 5 +++
...ions-incompatible-data-for-table-error-class.md | 4 +++
.../catalyst/analysis/ResolveInsertionBase.scala | 4 +--
.../catalyst/analysis/TableOutputResolver.scala | 36 +++++++++++++++-------
.../spark/sql/errors/QueryCompilationErrors.scala | 19 +++++++++---
.../org/apache/spark/sql/SQLInsertTestSuite.scala | 5 +--
6 files changed, 54 insertions(+), 19 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index af78dd2f9f8..87b9da7638b 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1035,6 +1035,11 @@
"Cannot safely cast <colName> <srcType> to <targetType>."
]
},
+ "EXTRA_COLUMNS" : {
+ "message" : [
+ "Cannot write extra columns <extraColumns>."
+ ]
+ },
"EXTRA_STRUCT_FIELDS" : {
"message" : [
"Cannot write extra fields <extraFields> to the struct <colName>."
diff --git
a/docs/sql-error-conditions-incompatible-data-for-table-error-class.md
b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md
index f70b69ba6c5..0dd28e9d55c 100644
--- a/docs/sql-error-conditions-incompatible-data-for-table-error-class.md
+++ b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md
@@ -37,6 +37,10 @@ Cannot find data for the output column `<colName>`.
Cannot safely cast `<colName>` `<srcType>` to `<targetType>`.
+## EXTRA_COLUMNS
+
+Cannot write extra columns `<extraColumns>`.
+
## EXTRA_STRUCT_FIELDS
Cannot write extra fields `<extraFields>` to the struct `<colName>`.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala
index 8b120095bc6..ad89005a093 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala
@@ -36,10 +36,10 @@ abstract class ResolveInsertionBase extends
Rule[LogicalPlan] {
if (i.userSpecifiedCols.size != i.query.output.size) {
if (i.userSpecifiedCols.size > i.query.output.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
- tblName, i.userSpecifiedCols, i.query)
+ tblName, i.userSpecifiedCols, i.query.output)
} else {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
- tblName, i.userSpecifiedCols, i.query)
+ tblName, i.userSpecifiedCols, i.query.output)
}
}
val projectByName = i.userSpecifiedCols.zip(i.query.output)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 21575f7b96b..fc0e727bea5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -87,7 +87,7 @@ object TableOutputResolver {
if (actualExpectedCols.size < query.output.size) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
- tableName, actualExpectedCols.map(_.name), query)
+ tableName, actualExpectedCols.map(_.name), query.output)
}
val errors = new mutable.ArrayBuffer[String]()
@@ -105,7 +105,7 @@ object TableOutputResolver {
} else {
if (actualExpectedCols.size > query.output.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
- tableName, actualExpectedCols.map(_.name), query)
+ tableName, actualExpectedCols.map(_.name), query.output)
}
resolveColumnsByPosition(tableName, query.output, actualExpectedCols,
conf, errors += _)
}
@@ -267,9 +267,13 @@ object TableOutputResolver {
if (matchedCols.size < inputCols.length) {
val extraCols = inputCols.filterNot(col =>
matchedCols.contains(col.name))
.map(col => s"${toSQLId(col.name)}").mkString(", ")
- throw
QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
- tableName, colPath.quoted, extraCols
- )
+ if (colPath.isEmpty) {
+ throw
QueryCompilationErrors.incompatibleDataToTableExtraColumnsError(tableName,
+ extraCols)
+ } else {
+ throw
QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
+ tableName, colPath.quoted, extraCols)
+ }
} else {
reordered
}
@@ -290,16 +294,26 @@ object TableOutputResolver {
val extraColsStr = inputCols.takeRight(inputCols.size -
expectedCols.size)
.map(col => toSQLId(col.name))
.mkString(", ")
- throw
QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
- tableName, colPath.quoted, extraColsStr
- )
+ if (colPath.isEmpty) {
+ throw
QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(tableName,
+ expectedCols.map(_.name), inputCols.map(_.toAttribute))
+ } else {
+ throw
QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
+ tableName, colPath.quoted, extraColsStr
+ )
+ }
} else if (inputCols.size < expectedCols.size) {
val missingColsStr = expectedCols.takeRight(expectedCols.size -
inputCols.size)
.map(col => toSQLId(col.name))
.mkString(", ")
- throw
QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError(
- tableName, colPath.quoted, missingColsStr
- )
+ if (colPath.isEmpty) {
+ throw
QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(tableName,
+ expectedCols.map(_.name), inputCols.map(_.toAttribute))
+ } else {
+ throw
QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError(
+ tableName, colPath.quoted, missingColsStr
+ )
+ }
}
inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index a97abf89434..ca101e79d92 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2159,25 +2159,25 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
def cannotWriteTooManyColumnsToTableError(
tableName: String,
expected: Seq[String],
- query: LogicalPlan): Throwable = {
+ queryOutput: Seq[Attribute]): Throwable = {
new AnalysisException(
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
messageParameters = Map(
"tableName" -> toSQLId(tableName),
"tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "),
- "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(",
")))
+ "dataColumns" -> queryOutput.map(c => toSQLId(c.name)).mkString(", ")))
}
def cannotWriteNotEnoughColumnsToTableError(
tableName: String,
expected: Seq[String],
- query: LogicalPlan): Throwable = {
+ queryOutput: Seq[Attribute]): Throwable = {
new AnalysisException(
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
messageParameters = Map(
"tableName" -> toSQLId(tableName),
"tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "),
- "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(",
")))
+ "dataColumns" -> queryOutput.map(c => toSQLId(c.name)).mkString(", ")))
}
def incompatibleDataToTableCannotFindDataError(
@@ -2202,6 +2202,17 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
)
}
+ def incompatibleDataToTableExtraColumnsError(
+ tableName: String, extraColumns: String): Throwable = {
+ new AnalysisException(
+ errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_COLUMNS",
+ messageParameters = Map(
+ "tableName" -> toSQLId(tableName),
+ "extraColumns" -> extraColumns
+ )
+ )
+ }
+
def incompatibleDataToTableExtraStructFieldsError(
tableName: String, colName: String, extraFields: String): Throwable = {
new AnalysisException(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 0bbed51d0a9..34e4ded09b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -213,9 +213,10 @@ trait SQLInsertTestSuite extends QueryTest with
SQLTestUtils {
exception = intercept[AnalysisException] {
processInsert("t1", df, overwrite = false, byName = true)
},
- v1ErrorClass = "_LEGACY_ERROR_TEMP_1186",
+ v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_COLUMNS",
v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
- v1Parameters = Map.empty[String, String],
+ v1Parameters = Map("tableName" -> "`spark_catalog`.`default`.`t1`",
+ "extraColumns" -> "`x1`"),
v2Parameters = Map("tableName" -> "`testcat`.`t1`", "colName" ->
"`c1`")
)
val df2 = Seq((3, 2, 1, 0)).toDF(Seq("c3", "c2", "c1", "c0"): _*)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]