This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a3feffdff9cd [SPARK-48585][SQL] Make `built-in` JdbcDialect's method
`classifyException` throw out the `original` exception
a3feffdff9cd is described below
commit a3feffdff9cd17e0435ac5620731093f40d1a3bf
Author: panbingkun <[email protected]>
AuthorDate: Tue Jun 18 14:50:34 2024 +0800
[SPARK-48585][SQL] Make `built-in` JdbcDialect's method `classifyException`
throw out the `original` exception
### What changes were proposed in this pull request?
The pr aims to make `built-in` JdbcDialect's method classifyException throw
out the `original` exception.
### Why are the changes needed?
As discussed in
https://github.com/apache/spark/pull/46912#discussion_r1630876576, the
following code:
https://github.com/apache/spark/blob/df4156aa3217cf0f58b4c6cbf33c967bb43f7155/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala#L746-L751
have lost the original cause of the error, let's correct it.
### Does this PR introduce _any_ user-facing change?
Yes, more accurate error conditions for end users.
### How was this patch tested?
- Manually test.
- Update existed UT & Pass GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46937 from panbingkun/improve_JDBCTableCatalog.
Authored-by: panbingkun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 26 ++++++++++++----------
.../apache/spark/sql/jdbc/AggregatedDialect.scala | 3 ++-
.../org/apache/spark/sql/jdbc/DB2Dialect.scala | 2 +-
.../apache/spark/sql/jdbc/DatabricksDialect.scala | 2 +-
.../org/apache/spark/sql/jdbc/DerbyDialect.scala | 2 +-
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 2 +-
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 17 ++++++++++++++
.../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 2 +-
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 2 +-
.../org/apache/spark/sql/jdbc/OracleDialect.scala | 2 +-
.../apache/spark/sql/jdbc/PostgresDialect.scala | 3 ++-
.../apache/spark/sql/jdbc/SnowflakeDialect.scala | 2 +-
.../apache/spark/sql/jdbc/TeradataDialect.scala | 2 +-
.../v2/jdbc/JDBCTableCatalogSuite.scala | 16 ++++++-------
14 files changed, 52 insertions(+), 31 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index c78e87d0b846..88ba00a8a1ae 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -83,14 +83,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
def testCreateTableWithProperty(tbl: String): Unit = {}
- def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = {
- checkError(
+ private def checkErrorFailedJDBC(
+ e: AnalysisException,
+ errorClass: String,
+ tbl: String): Unit = {
+ checkErrorMatchPVals(
exception = e,
- errorClass = "FAILED_JDBC.UNCLASSIFIED",
+ errorClass = errorClass,
parameters = Map(
- "url" -> "jdbc:",
- "message" -> s"Failed to load table: $tbl"
- )
+ "url" -> "jdbc:.*",
+ "tableName" -> s"`$tbl`")
)
}
@@ -132,7 +134,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4
STRING)")
}
- checkErrorFailedLoadTable(e, "not_existing_table")
+ checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}
test("SPARK-33034: ALTER TABLE ... drop column") {
@@ -154,7 +156,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1")
}
- checkErrorFailedLoadTable(e, "not_existing_table")
+ checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}
test("SPARK-33034: ALTER TABLE ... update column type") {
@@ -170,7 +172,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE
DOUBLE")
}
- checkErrorFailedLoadTable(e, "not_existing_table")
+ checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}
test("SPARK-33034: ALTER TABLE ... rename column") {
@@ -198,7 +200,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C")
}
- checkErrorFailedLoadTable(e, "not_existing_table")
+ checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}
test("SPARK-33034: ALTER TABLE ... update column nullability") {
@@ -209,7 +211,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP
NOT NULL")
}
- checkErrorFailedLoadTable(e, "not_existing_table")
+ checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}
test("CREATE TABLE with table comment") {
@@ -231,7 +233,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT)
TBLPROPERTIES('a'='1')")
}
- assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED")
+ checkErrorFailedJDBC(e, "FAILED_JDBC.CREATE_TABLE", "new_table")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
index 8f537aacebe5..5e79dbbb4d72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.types.{DataType, MetadataBuilder}
*
* @param dialects List of dialects.
*/
-private class AggregatedDialect(dialects: List[JdbcDialect]) extends
JdbcDialect {
+private class AggregatedDialect(dialects: List[JdbcDialect])
+ extends JdbcDialect with NoLegacyJDBCError {
require(dialects.nonEmpty)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 934ccdb51aa3..8ccf94166a70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._
-private case class DB2Dialect() extends JdbcDialect with SQLConfHelper {
+private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with
NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:db2")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
index 54b8c2622827..af77f8575dd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
@@ -25,7 +25,7 @@ import
org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._
-private case class DatabricksDialect() extends JdbcDialect {
+private case class DatabricksDialect() extends JdbcDialect with
NoLegacyJDBCError {
override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:databricks")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
index 23da4dbb60a5..7b65a01b5e70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors
import org.apache.spark.sql.types._
-private case class DerbyDialect() extends JdbcDialect {
+private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 949455b248ff..3ece44ece9e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -37,7 +37,7 @@ import
org.apache.spark.sql.connector.expressions.{Expression, FieldReference, N
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType,
DecimalType, MetadataBuilder, ShortType, StringType, TimestampType}
-private[sql] case class H2Dialect() extends JdbcDialect {
+private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError
{
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 4ebe73292f11..290665020f88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -841,6 +841,23 @@ abstract class JdbcDialect extends Serializable with
Logging {
metadata: MetadataBuilder): Unit = {}
}
+/**
+ * Make the `classifyException` method throw out the original exception
+ */
+trait NoLegacyJDBCError extends JdbcDialect {
+
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String],
+ description: String): AnalysisException = {
+ new AnalysisException(
+ errorClass = errorClass,
+ messageParameters = messageParameters,
+ cause = Some(e))
+ }
+}
+
/**
* :: DeveloperApi ::
* Registry of dialects that apply to every new jdbc
`org.apache.spark.sql.DataFrame`.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index e341bf3720f4..d03602b0338c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -34,7 +34,7 @@ import
org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY}
import org.apache.spark.sql.types._
-private case class MsSqlServerDialect() extends JdbcDialect {
+private case class MsSqlServerDialect() extends JdbcDialect with
NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index d2034812cdd3..0f1bccbb01d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types._
-private case class MySQLDialect() extends JdbcDialect with SQLConfHelper {
+private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with
NoLegacyJDBCError {
override def canHandle(url : String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 38fee89bf726..627007e27559 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.jdbc.OracleDialect._
import org.apache.spark.sql.types._
-private case class OracleDialect() extends JdbcDialect with SQLConfHelper {
+private case class OracleDialect() extends JdbcDialect with SQLConfHelper with
NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 93052a0c37b5..03fefd82802e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -37,7 +37,8 @@ import
org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._
-private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
+private case class PostgresDialect()
+ extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
index 276364d5d89e..a443a798db7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
@@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.{BooleanType, DataType}
-private case class SnowflakeDialect() extends JdbcDialect {
+private case class SnowflakeDialect() extends JdbcDialect with
NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
index 95a9f60b64ed..322b259485f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types._
-private case class TeradataDialect() extends JdbcDialect {
+private case class TeradataDialect() extends JdbcDialect with
NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:teradata")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index daf5d8507ecc..d2ff33e10477 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -619,15 +619,15 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
test("CREATE TABLE with table property") {
withTable("h2.test.new_table") {
- checkError(
+ checkErrorMatchPVals(
exception = intercept[AnalysisException] {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" +
" TBLPROPERTIES('ENGINE'='tableEngineName')")
},
- errorClass = "FAILED_JDBC.UNCLASSIFIED",
+ errorClass = "FAILED_JDBC.CREATE_TABLE",
parameters = Map(
- "url" -> "jdbc:",
- "message" -> "Failed table creation: test.new_table"))
+ "url" -> "jdbc:.*",
+ "tableName" -> "`test`.`new_table`"))
}
}
@@ -639,14 +639,14 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
}
test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length")
{
- checkError(
+ checkErrorMatchPVals(
exception = intercept[AnalysisException]{
sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))")
},
- errorClass = "FAILED_JDBC.UNCLASSIFIED",
+ errorClass = "FAILED_JDBC.CREATE_TABLE",
parameters = Map(
- "url" -> "jdbc:",
- "message" -> "Failed table creation: test.new_table"))
+ "url" -> "jdbc:.*",
+ "tableName" -> "`test`.`new_table`"))
}
test("SPARK-42955: Skip classifyException and wrap AnalysisException for
SparkThrowable") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]