This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2aa06fcf160 [SPARK-45271][SQL] Merge _LEGACY_ERROR_TEMP_1113 into
TABLE_OPERATION & delete some unused method in QueryCompilationErrors
2aa06fcf160 is described below
commit 2aa06fcf1607bbad9e09649e587493032e739e35
Author: panbingkun <[email protected]>
AuthorDate: Tue Sep 26 19:35:27 2023 +0800
[SPARK-45271][SQL] Merge _LEGACY_ERROR_TEMP_1113 into TABLE_OPERATION &
delete some unused method in QueryCompilationErrors
### What changes were proposed in this pull request?
The pr aims to
- merge _LEGACY_ERROR_TEMP_1113 into UNSUPPORTED_FEATURE.TABLE_OPERATION
- delete some unused method in QueryCompilationErrors
- refactoring some methods to reduce call hierarchy
### Why are the changes needed?
The changes improve the error framework.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Pass GA
- Manually test
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43044 from panbingkun/LEGACY_ERROR_TEMP_1113.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-classes.json | 5 --
.../spark/sql/catalyst/plans/logical/object.scala | 12 ++-
.../spark/sql/errors/QueryCompilationErrors.scala | 88 +++++++++-------------
.../main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../datasources/v2/TableCapabilityCheck.scala | 2 +-
.../streaming/test/DataStreamTableAPISuite.scala | 13 +++-
6 files changed, 57 insertions(+), 65 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 9bcbcbc1962..5d827c67482 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -4097,11 +4097,6 @@
"DESCRIBE does not support partition for v2 tables."
]
},
- "_LEGACY_ERROR_TEMP_1113" : {
- "message" : [
- "Table <table> does not support <cmd>."
- ]
- },
"_LEGACY_ERROR_TEMP_1114" : {
"message" : [
"The streaming sources in a query do not have a common supported
execution mode.",
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index d4851019db8..9bf8db0b4fa 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -727,16 +727,20 @@ object JoinWith {
if a.sameRef(b) =>
catalyst.expressions.EqualTo(
plan.left.resolveQuoted(a.name, resolver).getOrElse(
- throw QueryCompilationErrors.resolveException(a.name,
plan.left.schema.fieldNames)),
+ throw QueryCompilationErrors.unresolvedColumnError(
+ a.name, plan.left.schema.fieldNames)),
plan.right.resolveQuoted(b.name, resolver).getOrElse(
- throw QueryCompilationErrors.resolveException(b.name,
plan.right.schema.fieldNames)))
+ throw QueryCompilationErrors.unresolvedColumnError(
+ b.name, plan.right.schema.fieldNames)))
case catalyst.expressions.EqualNullSafe(a: AttributeReference, b:
AttributeReference)
if a.sameRef(b) =>
catalyst.expressions.EqualNullSafe(
plan.left.resolveQuoted(a.name, resolver).getOrElse(
- throw QueryCompilationErrors.resolveException(a.name,
plan.left.schema.fieldNames)),
+ throw QueryCompilationErrors.unresolvedColumnError(
+ a.name, plan.left.schema.fieldNames)),
plan.right.resolveQuoted(b.name, resolver).getOrElse(
- throw QueryCompilationErrors.resolveException(b.name,
plan.right.schema.fieldNames)))
+ throw QueryCompilationErrors.unresolvedColumnError(
+ b.name, plan.right.schema.fieldNames)))
}
}
plan.copy(condition = cond)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 3536626d239..9d2b1225825 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -818,10 +818,6 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("hintName" -> hintName))
}
- def attributeNameSyntaxError(name: String): Throwable = {
- DataTypeErrors.attributeNameSyntaxError(name)
- }
-
def starExpandDataTypeNotSupportedError(attributes: Seq[String]): Throwable
= {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1050",
@@ -868,6 +864,40 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
"operation" -> operation))
}
+ private def unsupportedTableOperationError(
+ tableName: String,
+ operation: String): Throwable = {
+ new AnalysisException(
+ errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+ messageParameters = Map(
+ "tableName" -> toSQLId(tableName),
+ "operation" -> operation))
+ }
+
+ def unsupportedBatchReadError(table: Table): Throwable = {
+ unsupportedTableOperationError(table.name(), "batch scan")
+ }
+
+ def unsupportedStreamingScanError(table: Table): Throwable = {
+ unsupportedTableOperationError(table.name(), "either micro-batch or
continuous scan")
+ }
+
+ def unsupportedAppendInBatchModeError(table: Table): Throwable = {
+ unsupportedTableOperationError(table.name(), "append in batch mode")
+ }
+
+ def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
+ unsupportedTableOperationError(table.name(), "dynamic overwrite in batch
mode")
+ }
+
+ def unsupportedTruncateInBatchModeError(table: Table): Throwable = {
+ unsupportedTableOperationError(table.name(), "truncate in batch mode")
+ }
+
+ def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = {
+ unsupportedTableOperationError(table.name(), "overwrite by filter in batch
mode")
+ }
+
def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String):
Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION",
@@ -1292,38 +1322,6 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
new CannotReplaceMissingTableException(tableIdentifier, cause)
}
- private def unsupportedTableOperationError(table: Table, cmd: String):
Throwable = {
- new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_1113",
- messageParameters = Map(
- "table" -> table.name,
- "cmd" -> cmd))
- }
-
- def unsupportedBatchReadError(table: Table): Throwable = {
- unsupportedTableOperationError(table, "batch scan")
- }
-
- def unsupportedMicroBatchOrContinuousScanError(table: Table): Throwable = {
- unsupportedTableOperationError(table, "either micro-batch or continuous
scan")
- }
-
- def unsupportedAppendInBatchModeError(table: Table): Throwable = {
- unsupportedTableOperationError(table, "append in batch mode")
- }
-
- def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
- unsupportedTableOperationError(table, "dynamic overwrite in batch mode")
- }
-
- def unsupportedTruncateInBatchModeError(table: Table): Throwable = {
- unsupportedTableOperationError(table, "truncate in batch mode")
- }
-
- def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = {
- unsupportedTableOperationError(table, "overwrite by filter in batch mode")
- }
-
def streamingSourcesDoNotSupportCommonExecutionModeError(
microBatchSources: Seq[String],
continuousSources: Seq[String]): Throwable = {
@@ -2395,10 +2393,6 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("windowExpressions" ->
windowExpressions.toString()))
}
- def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
- DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError()
- }
-
def escapeCharacterInTheMiddleError(pattern: String, char: String):
Throwable = {
new AnalysisException(
errorClass = "INVALID_FORMAT.ESC_IN_THE_MIDDLE",
@@ -3164,24 +3158,16 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("expr" -> expr.sql))
}
- def unresolvedColumnWithSuggestionError(
- objectName: String, suggestion: String): AnalysisException = {
+ def unresolvedColumnError(colName: String, fields: Array[String]):
AnalysisException = {
new AnalysisException(
errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
messageParameters = Map(
- "objectName" -> toSQLId(objectName),
- "proposal" -> suggestion
+ "objectName" -> toSQLId(colName),
+ "proposal" -> fields.map(toSQLId).mkString(", ")
)
)
}
- def resolveException(colName: String, fields: Array[String]):
AnalysisException = {
- QueryCompilationErrors.unresolvedColumnWithSuggestionError(
- colName,
- fields.map(toSQLId).mkString(", ")
- )
- }
-
def cannotParseIntervalError(delayThreshold: String, e: Throwable):
Throwable = {
val threshold = if (delayThreshold == null) "" else delayThreshold
new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f07496e6430..68ea6dcffb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -248,7 +248,7 @@ class Dataset[T] private[sql](
private[sql] def resolve(colName: String): NamedExpression = {
val resolver = sparkSession.sessionState.analyzer.resolver
queryExecution.analyzed.resolveQuoted(colName, resolver)
- .getOrElse(throw QueryCompilationErrors.resolveException(colName,
schema.fieldNames))
+ .getOrElse(throw QueryCompilationErrors.unresolvedColumnError(colName,
schema.fieldNames))
}
private[sql] def numericColumns: Seq[Expression] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
index 28431972253..a3afaa36ab9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
@@ -42,7 +42,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
throw QueryCompilationErrors.unsupportedBatchReadError(r.table)
case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ,
CONTINUOUS_READ) =>
- throw
QueryCompilationErrors.unsupportedMicroBatchOrContinuousScanError(r.table)
+ throw QueryCompilationErrors.unsupportedStreamingScanError(r.table)
// TODO: check STREAMING_WRITE capability. It's not doable now because
we don't have a
// a logical plan for streaming write.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index d049f27c21e..7bf81fb9865 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -113,9 +113,16 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING
foo")
- intercept[AnalysisException] {
- spark.readStream.table(tableIdentifier)
- }.message.contains("does not support either micro-batch or continuous
scan")
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.readStream.table(tableIdentifier)
+ },
+ errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+ parameters = Map(
+ "tableName" -> "`testcat`.`table_name`",
+ "operation" -> "either micro-batch or continuous scan"
+ )
+ )
}
test("read: read table with custom catalog") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]