This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 43adfa070a40 [SPARK-46850][SQL] Convert `_LEGACY_ERROR_TEMP_2102 ` to
`UNSUPPORTED_DATATYPE`
43adfa070a40 is described below
commit 43adfa070a40832d8316be8db164e3aca8a4f593
Author: panbingkun <[email protected]>
AuthorDate: Thu Jan 25 18:04:17 2024 +0300
[SPARK-46850][SQL] Convert `_LEGACY_ERROR_TEMP_2102 ` to
`UNSUPPORTED_DATATYPE`
### What changes were proposed in this pull request?
The pr aims to
- convert `_LEGACY_ERROR_TEMP_2102` to `UNSUPPORTED_DATATYPE`.
- remove some outdated comments.
### Why are the changes needed?
The changes improve the error framework.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Add new UT
- Pass GA
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44871 from panbingkun/LEGACY_ERROR_TEMP_2102.
Lead-authored-by: panbingkun <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../src/main/resources/error/error-classes.json | 5 -----
.../spark/sql/catalyst/csv/UnivocityParser.scala | 6 ++---
.../spark/sql/catalyst/json/JacksonParser.scala | 4 ++--
.../spark/sql/errors/QueryExecutionErrors.scala | 7 ------
.../spark/sql/execution/columnar/ColumnType.scala | 4 ++--
.../org/apache/spark/sql/CsvFunctionsSuite.scala | 26 +++++++++++++++++++++-
.../sql/execution/columnar/ColumnTypeSuite.scala | 14 +++++++-----
7 files changed, 39 insertions(+), 27 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 1f3122a502c5..64d65fd4beed 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -5966,11 +5966,6 @@
"Not support non-primitive type now."
]
},
- "_LEGACY_ERROR_TEMP_2102" : {
- "message" : [
- "Unsupported type: <catalogString>."
- ]
- },
"_LEGACY_ERROR_TEMP_2103" : {
"message" : [
"Dictionary encoding should not be used because of dictionary overflow."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index b99ee630d4b2..eb7e120277bb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -30,13 +30,12 @@ import org.apache.spark.sql.catalyst.expressions.{Cast,
EmptyRow, ExprUtils, Gen
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
-
/**
* Constructs a parser for a given schema that translates CSV data to an
[[InternalRow]].
*
@@ -273,8 +272,7 @@ class UnivocityParser(
case udt: UserDefinedType[_] =>
makeConverter(name, udt.sqlType, nullable)
- // We don't actually hit this exception though, we keep it for
understandability
- case _ => throw QueryExecutionErrors.unsupportedTypeError(dataType)
+ case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
}
private def nullSafeDatum(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index eace96ac8729..36f37888b084 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -381,7 +381,7 @@ class JacksonParser(
}
// We don't actually hit this exception though, we keep it for
understandability
- case _ => throw QueryExecutionErrors.unsupportedTypeError(dataType)
+ case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 69794517f917..b09885c904a5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1097,13 +1097,6 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
messageParameters = Map.empty)
}
- def unsupportedTypeError(dataType: DataType): Throwable = {
- new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2102",
- messageParameters = Map("catalogString" -> dataType.catalogString),
- cause = null)
- }
-
def useDictionaryEncodingWhenDictionaryOverflowError(): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2103",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index 96c7b4103142..53cb568d2060 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -25,7 +25,7 @@ import scala.annotation.tailrec
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{PhysicalArrayType,
PhysicalBinaryType, PhysicalBooleanType, PhysicalByteType,
PhysicalCalendarIntervalType, PhysicalDataType, PhysicalDecimalType,
PhysicalDoubleType, PhysicalFloatType, PhysicalIntegerType, PhysicalLongType,
PhysicalMapType, PhysicalNullType, PhysicalShortType, PhysicalStringType,
PhysicalStructType}
-import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.errors.ExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -829,7 +829,7 @@ private[columnar] object ColumnType {
case map: MapType => MAP(PhysicalMapType(map.keyType, map.valueType,
map.valueContainsNull))
case struct: StructType => STRUCT(PhysicalStructType(struct.fields))
case udt: UserDefinedType[_] => ColumnType(udt.sqlType)
- case other => throw QueryExecutionErrors.unsupportedTypeError(other)
+ case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index e7c1f0414b61..22a439bd179b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -23,7 +23,8 @@ import java.util.Locale
import scala.jdk.CollectionConverters._
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -159,6 +160,29 @@ class CsvFunctionsSuite extends QueryTest with
SharedSparkSession {
Row(Row(null)))
}
+ test("from_csv with invalid datatype") {
+ val rows = new java.util.ArrayList[Row]()
+ rows.add(Row(1L, Row(2L, "Alice", Array(100L, 200L, null, 300L))))
+
+ val valueType = StructType(Seq(
+ StructField("age", LongType),
+ StructField("name", StringType),
+ StructField("scores", ArrayType(LongType))))
+
+ val schema = StructType(Seq(StructField("key", LongType),
StructField("value", valueType)))
+
+ val options = Map.empty[String, String]
+ val df = spark.createDataFrame(rows, schema)
+
+ checkError(
+ exception = intercept[SparkException] {
+ df.select(from_csv(to_csv($"value"), schema, options)).collect()
+ }.getCause.asInstanceOf[SparkUnsupportedOperationException],
+ errorClass = "UNSUPPORTED_DATATYPE",
+ parameters = Map("typeName" -> toSQLType(valueType))
+ )
+ }
+
test("from_csv with option (nanValue)") {
val df = Seq("#").toDS()
val schema = new StructType().add("float", FloatType)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
index bea3555348d3..d79ac8dc3545 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -21,7 +21,7 @@ import java.nio.{ByteBuffer, ByteOrder}
import java.nio.charset.StandardCharsets
import java.time.{Duration, Period}
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow,
UnsafeProjection}
@@ -163,10 +163,12 @@ class ColumnTypeSuite extends SparkFunSuite {
override def typeName: String = "invalid type name"
}
- val message = intercept[java.lang.Exception] {
- ColumnType(invalidType)
- }.getMessage
-
- assert(message.contains("Unsupported type: invalid type name"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ ColumnType(invalidType)
+ },
+ errorClass = "UNSUPPORTED_DATATYPE",
+ parameters = Map("typeName" -> "\"INVALID TYPE NAME\"")
+ )
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]