This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a08b3fdeff7a [SPARK-52159][SQL] Properly handle table existence check
for jdbc dialects
a08b3fdeff7a is described below
commit a08b3fdeff7a6d0da835b4f4f71f1c14191dec22
Author: milastdbx <[email protected]>
AuthorDate: Mon May 19 11:38:55 2025 +0200
[SPARK-52159][SQL] Properly handle table existence check for jdbc dialects
### What changes were proposed in this pull request?
In this PR, I propose that we rethrow exception when we are doing table
existence check in jdbc dialect, if exception is not related to table/schema
not being found. I propose this because currently all exceptions get swallowed
and method returns false. From the perspective of the system its as if table
doesn't exist, which is a wrong message (e.g, we can get table does not exist
even if it was network failure).
This issue is mostly exposed when TableCatalog API is used,
```
override def loadTable(ident: Identifier): Table = {
if (!tableExists(ident)) {
throw QueryCompilationErrors.noSuchTableError(ident)
}
```
Error code links:
- https://www.ibm.com/docs/en/db2-for-zos/12.0.0?topic=codes-error-sql
- https://db.apache.org/derby/docs/10.4/ref/rrefexcept71493.html
- https://docs.databricks.com/aws/en/error-messages/sqlstates
- https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
-
https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors-0-to-999?view=sql-server-ver16
- https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
- https://www.postgresql.org/docs/current/errcodes-appendix.html
How its implemented today, tableExist cannot throw anything, so every
exception gets converted to noSuchTableError which is wrong.
### Does this PR introduce _any_ user-facing change?
Customers will get proper error messages.
### How was this patch tested?
Tests
### Was this patch authored or co-authored using generative AI tooling?
Generetad by: COPILOT
Closes #50835 from milastdbx/sparkHandleTableNotFound.
Authored-by: milastdbx <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 26 ++++++++++++++++++++++
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 17 ++++++++------
.../org/apache/spark/sql/jdbc/DB2Dialect.scala | 4 ++++
.../apache/spark/sql/jdbc/DatabricksDialect.scala | 6 ++++-
.../org/apache/spark/sql/jdbc/DerbyDialect.scala | 8 ++++++-
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 4 ++++
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 5 ++++-
.../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 4 ++++
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 4 ++++
.../org/apache/spark/sql/jdbc/OracleDialect.scala | 5 +++++
.../apache/spark/sql/jdbc/PostgresDialect.scala | 6 +++++
.../apache/spark/sql/jdbc/SnowflakeDialect.scala | 5 +++++
.../apache/spark/sql/jdbc/TeradataDialect.scala | 6 ++++-
13 files changed, 89 insertions(+), 11 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 51862ae1535c..b7dc397464df 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -1050,6 +1050,32 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
}
}
+ test("SPARK-48618: Test table does not exists error") {
+ val tbl = s"$catalogName.tbl1"
+ val sqlStatement = s"SELECT * FROM $tbl"
+ val startPos = sqlStatement.indexOf(tbl)
+
+ withTable(tbl) {
+ sql(s"CREATE TABLE $tbl (col1 INT, col2 INT)")
+ sql(s"INSERT INTO $tbl VALUES (1, 2)")
+ val df = sql(sqlStatement)
+ val row = df.collect()
+ assert(row.length === 1)
+
+ // Drop the table
+ sql(s"DROP TABLE IF EXISTS $tbl")
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlStatement).collect()
+ },
+ condition = "TABLE_OR_VIEW_NOT_FOUND",
+ parameters = Map("relationName" -> s"`$catalogName`.`tbl1`"),
+ context = ExpectedContext(tbl, startPos, startPos + tbl.length - 1)
+ )
+ }
+ }
+
def testDatetime(tbl: String): Unit = {}
test("scan with filter push-down with date time functions") {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 8112cf1c80ef..0077012e2b0e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -26,7 +26,7 @@ import java.util
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException,
TaskContext}
@@ -64,18 +64,21 @@ object JdbcUtils extends Logging with SQLConfHelper {
def tableExists(conn: Connection, options: JdbcOptionsInWrite): Boolean = {
val dialect = JdbcDialects.get(options.url)
- // Somewhat hacky, but there isn't a good way to identify whether a table
exists for all
- // SQL database systems using JDBC meta data calls, considering "table"
could also include
- // the database name. Query used to find table exists can be overridden by
the dialects.
- Try {
+ val executionResult = Try {
val statement =
conn.prepareStatement(dialect.getTableExistsQuery(options.table))
try {
statement.setQueryTimeout(options.queryTimeout)
- statement.executeQuery()
+ statement.executeQuery() // Success means table exists (query
executed without error)
} finally {
statement.close()
}
- }.isSuccess
+ }
+
+ executionResult match {
+ case Success(_) => true
+ case Failure(e: SQLException) if dialect.isObjectNotFoundException(e) =>
false
+ case Failure(e) => throw e // Re-throw unexpected exceptions
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 427275592ada..b748975eef65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -47,6 +47,10 @@ private case class DB2Dialect() extends JdbcDialect with
SQLConfHelper with NoLe
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getErrorCode == -204
+ }
+
class DB2SQLBuilder extends JDBCSQLBuilder {
override def visitAggregateFunction(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
index 1aa2282f4a84..cb3cfecd940b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.jdbc
-import java.sql.Connection
+import java.sql.{Connection, SQLException}
import scala.collection.mutable.ArrayBuilder
@@ -31,6 +31,10 @@ private case class DatabricksDialect() extends JdbcDialect
with NoLegacyJDBCErro
url.startsWith("jdbc:databricks")
}
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getSQLState == "42P01" || e.getSQLState == "42704"
+ }
+
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = {
sqlType match {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
index 7b65a01b5e70..f4e6e25f58db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.jdbc
-import java.sql.Types
+import java.sql.{SQLException, Types}
import java.util.Locale
import org.apache.spark.sql.connector.catalog.Identifier
@@ -38,6 +38,12 @@ private case class DerbyDialect() extends JdbcDialect with
NoLegacyJDBCError {
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getSQLState.equalsIgnoreCase("42Y07") ||
+ e.getSQLState.equalsIgnoreCase("42X05") ||
+ e.getSQLState.equalsIgnoreCase("X0X05")
+ }
+
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = {
if (sqlType == Types.REAL) Option(FloatType) else None
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 82f6f5c6264c..956e7c05cd5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -57,6 +57,10 @@ private[sql] case class H2Dialect() extends JdbcDialect with
NoLegacyJDBCError {
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ Set(42102, 42103, 42104, 90079).contains(e.getErrorCode)
+ }
+
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = {
sqlType match {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 7719d6a67053..d6fe564c1520 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.jdbc
-import java.sql.{Connection, Date, Driver, ResultSetMetaData, Statement,
Timestamp}
+import java.sql.{Connection, Date, Driver, ResultSetMetaData, SQLException,
Statement, Timestamp}
import java.time.{Instant, LocalDate, LocalDateTime}
import java.util
import java.util.ServiceLoader
@@ -758,6 +758,9 @@ abstract class JdbcDialect extends Serializable with
Logging {
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3182")
}
+ @Since("4.1.0")
+ def isObjectNotFoundException(e: SQLException): Boolean = true
+
/**
* Gets a dialect exception, classifies it and wraps it by
`AnalysisException`.
* @param e The dialect specific exception.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 531e5d4f0f3a..7efdc52f35be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -38,6 +38,10 @@ private case class MsSqlServerDialect() extends JdbcDialect
with NoLegacyJDBCErr
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getErrorCode == 208
+ }
+
// Microsoft SQL Server does not have the boolean type.
// Compile the boolean value to the bit data type instead.
// scalastyle:off line.size.limit
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 4323fa4ed99b..5b894e71619a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -50,6 +50,10 @@ private case class MySQLDialect() extends JdbcDialect with
SQLConfHelper with No
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getErrorCode == 1146
+ }
+
class MySQLSQLBuilder extends JDBCSQLBuilder {
override def visitExtract(extract: Extract): String = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 851b0e04d5e5..236d9469a58d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -49,6 +49,11 @@ private case class OracleDialect() extends JdbcDialect with
SQLConfHelper with N
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getMessage.contains("ORA-00942") ||
+ e.getMessage.contains("ORA-39165")
+ }
+
class OracleSQLBuilder extends JDBCSQLBuilder {
override def visitAggregateFunction(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index b4cd5f578ccd..73b10f72e21b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -54,6 +54,12 @@ private case class PostgresDialect()
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getSQLState == "42P01" ||
+ e.getSQLState == "3F000" ||
+ e.getSQLState == "42704"
+ }
+
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = {
sqlType match {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
index a443a798db7c..d4ac21a45300 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.jdbc
+import java.sql.SQLException
import java.util.Locale
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
@@ -26,6 +27,10 @@ private case class SnowflakeDialect() extends JdbcDialect
with NoLegacyJDBCError
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake")
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getSQLState == "002003"
+ }
+
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case BooleanType =>
// By default, BOOLEAN is mapped to BIT(1).
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
index 322b259485f5..bbdab81201fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.jdbc
-import java.sql.Types
+import java.sql.{SQLException, Types}
import java.util.Locale
import org.apache.spark.sql.connector.catalog.Identifier
@@ -39,6 +39,10 @@ private case class TeradataDialect() extends JdbcDialect
with NoLegacyJDBCError
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)
+ override def isObjectNotFoundException(e: SQLException): Boolean = {
+ e.getErrorCode == 3807
+ }
+
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR))
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]