This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fe1f7e4ea48c [SPARK-54490][SQL] Classify "withConnection" exceptions
in JDBCUtils
fe1f7e4ea48c is described below
commit fe1f7e4ea48cf2ac9cb7146f757d4009c6d377e4
Author: Ubuntu <[email protected]>
AuthorDate: Sat Nov 29 00:59:23 2025 +0800
[SPARK-54490][SQL] Classify "withConnection" exceptions in JDBCUtils
### What changes were proposed in this pull request?
Currently connection error in JDBCUtils.scala gets thrown as is
Need to classify it to catch properly (to be able to add sqlState)
### Why are the changes needed?
Right now this exception is not caught and unclassified
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tests are added in this PR
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53195 from aleksandr-chernousov-db/SPARK-54490.
Authored-by: Ubuntu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 5 ++
.../spark/sql/connect/ClientE2ETestSuite.scala | 2 +-
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 12 ++++-
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 54 +++++++++++++++++-----
4 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 8da9896ead8e..0731fbec7231 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1727,6 +1727,11 @@
"Alter the table <tableName>."
]
},
+ "CONNECTION" : {
+ "message" : [
+ "Couldn't connect to the database"
+ ]
+ },
"CREATE_INDEX" : {
"message" : [
"Create the index <indexName> in the <tableName> table."
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
index 33524c8df269..b9f72badd45f 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
@@ -461,7 +461,7 @@ class ClientE2ETestSuite
assert(result.length == 10)
} finally {
// clean up
- assertThrows[SparkException] {
+ assertThrows[AnalysisException] {
spark.read.jdbc(url = s"$url;drop=true", table, new
Properties()).collect()
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 926c133d2875..3766cd1ad641 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -1315,7 +1315,17 @@ object JdbcUtils extends Logging with SQLConfHelper {
def withConnection[T](options: JDBCOptions)(f: Connection => T): T = {
val dialect = JdbcDialects.get(options.url)
- val conn = dialect.createConnectionFactory(options)(-1)
+
+ var conn : Connection = null
+ classifyException(
+ condition = "FAILED_JDBC.CONNECTION",
+ messageParameters = Map("url" -> options.getRedactUrl()),
+ dialect,
+ description = "Failed to connect",
+ isRuntime = false
+ ) {
+ conn = dialect.createConnectionFactory(options)(-1)
+ }
try {
f(conn)
} finally {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 276062fb1daa..e38648e9468c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1939,17 +1939,20 @@ class JDBCSuite extends QueryTest with
SharedSparkSession {
assert(getRowCount(df2) < df3.count())
}
- test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to
the given url") {
- val e = intercept[IllegalArgumentException] {
- val opts = Map(
- "url" -> "jdbc:mysql://localhost/db",
- "dbtable" -> "table",
- "driver" -> "org.postgresql.Driver"
- )
- spark.read.format("jdbc").options(opts).load()
- }.getMessage
- assert(e.contains("The driver could not open a JDBC connection. " +
- "Check the URL: jdbc:mysql://localhost/db"))
+ test("SPARK-26383 throw FAILED_JDBC.CONNECTION if wrong kind of driver to
the given url") {
+ val url = "jdbc:mysql://localhost/db"
+ checkError(
+ exception = intercept[AnalysisException] {
+ val opts = Map(
+ "url" -> url,
+ "dbtable" -> "table",
+ "driver" -> "org.postgresql.Driver"
+ )
+ spark.read.format("jdbc").options(opts).load()
+ },
+ condition = "FAILED_JDBC.CONNECTION",
+ parameters = Map("url" -> url)
+ )
}
test("support casting patterns for lower/upper bounds of TimestampType") {
@@ -2273,4 +2276,33 @@ class JDBCSuite extends QueryTest with
SharedSparkSession {
parameters = Map("jdbcDialect" -> dialect.getClass.getSimpleName))
}
}
+
+ test("FAILED_JDBC.CONNECTION") {
+ val testUrls = Seq(
+ "jdbc:mysql",
+ "jdbc:postgresql",
+ "jdbc:sqlserver",
+ "jdbc:db2",
+ "jdbc:h2",
+ "jdbc:teradata",
+ "jdbc:databricks"
+ )
+
+ testUrls.foreach { connectionUrl =>
+ val url = s"$connectionUrl://invalid_url/"
+ val options = new JDBCOptions(Map(
+ "url" -> url,
+ "dbtable" -> "invalid_table"
+ ))
+ checkError(
+ exception = intercept[AnalysisException] {
+ JdbcUtils.withConnection(options) { conn =>
+ conn.getMetaData
+ }
+ },
+ condition = "FAILED_JDBC.CONNECTION",
+ parameters = Map("url" -> url)
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]