This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new f8c2c36e3284 [SPARK-54108][CONNECT] Revise execute* methods of
SparkConnectStatement
f8c2c36e3284 is described below
commit f8c2c36e328415acbccf9f189b2d0bb828ccc100
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Nov 3 16:24:14 2025 +0800
[SPARK-54108][CONNECT] Revise execute* methods of SparkConnectStatement
### What changes were proposed in this pull request?
This PR revises the following 3 `execute*` methods and one additional
`getUpdateCount` method of `SparkConnectStatement` that are defined in
`java.sql.Statement`
```java
/**
* Executes the given SQL statement, which returns a single
* {code ResultSet} object.
*<p>
* <strong>Note:</strong>This method cannot be called on a
* {code PreparedStatement} or {code CallableStatement}.
* param sql an SQL statement to be sent to the database, typically a
* static SQL {code SELECT} statement
* return a {code ResultSet} object that contains the data produced
* by the given query; never {code null}
* throws SQLException if a database access error occurs,
* this method is called on a closed {code Statement}, the given
* SQL statement produces anything other than a single
* {code ResultSet} object, the method is called on a
* {code PreparedStatement} or {code CallableStatement}
* throws SQLTimeoutException when the driver has determined that the
* timeout value that was specified by the {code setQueryTimeout}
* method has been exceeded and has at least attempted to cancel
* the currently running {code Statement}
*/
ResultSet executeQuery(String sql) throws SQLException;
/**
* Executes the given SQL statement, which may be an {code INSERT},
* {code UPDATE}, or {code DELETE} statement or an
* SQL statement that returns nothing, such as an SQL DDL statement.
*<p>
* <strong>Note:</strong>This method cannot be called on a
* {code PreparedStatement} or {code CallableStatement}.
* param sql an SQL Data Manipulation Language (DML) statement, such as
{code INSERT}, {code UPDATE} or
* {code DELETE}; or an SQL statement that returns nothing,
* such as a DDL statement.
*
* return either (1) the row count for SQL Data Manipulation Language
(DML) statements
* or (2) 0 for SQL statements that return nothing
*
* throws SQLException if a database access error occurs,
* this method is called on a closed {code Statement}, the given
* SQL statement produces a {code ResultSet} object, the method is
called on a
* {code PreparedStatement} or {code CallableStatement}
* throws SQLTimeoutException when the driver has determined that the
* timeout value that was specified by the {code setQueryTimeout}
* method has been exceeded and has at least attempted to cancel
* the currently running {code Statement}
*/
int executeUpdate(String sql) throws SQLException;
/**
* Executes the given SQL statement, which may return multiple results.
* In some (uncommon) situations, a single SQL statement may return
* multiple result sets and/or update counts. Normally you can ignore
* this unless you are (1) executing a stored procedure that you know
may
* return multiple results or (2) you are dynamically executing an
* unknown SQL string.
* <P>
* The {code execute} method executes an SQL statement and indicates the
* form of the first result. You must then use the methods
* {code getResultSet} or {code getUpdateCount}
* to retrieve the result, and {code getMoreResults} to
* move to any subsequent result(s).
* <p>
*<strong>Note:</strong>This method cannot be called on a
* {code PreparedStatement} or {code CallableStatement}.
* param sql any SQL statement
* return {code true} if the first result is a {code ResultSet}
* object; {code false} if it is an update count or there are
* no results
* throws SQLException if a database access error occurs,
* this method is called on a closed {code Statement},
* the method is called on a
* {code PreparedStatement} or {code CallableStatement}
* throws SQLTimeoutException when the driver has determined that the
* timeout value that was specified by the {code setQueryTimeout}
* method has been exceeded and has at least attempted to cancel
* the currently running {code Statement}
* see #getResultSet
* see #getUpdateCount
* see #getMoreResults
*/
boolean execute(String sql) throws SQLException;
/**
* Retrieves the current result as an update count;
* if the result is a {code ResultSet} object or there are no more
results, -1
* is returned. This method should be called only once per result.
*
* return the current result as an update count; -1 if the current
result is a
* {code ResultSet} object or there are no more results
* throws SQLException if a database access error occurs or
* this method is called on a closed {code Statement}
* see #execute
*/
int getUpdateCount() throws SQLException;
```
### Why are the changes needed?
Make the implementation respect the JDBC API specification.
### Does this PR introduce _any_ user-facing change?
No, Connect JDBC Driver is an unreleased feature.
### How was this patch tested?
New UTs are added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52810 from pan3793/SPARK-54108.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit edfd3578d586ac6da019e49ca44727e3e8842cbd)
Signed-off-by: yangjie01 <[email protected]>
---
.../client/jdbc/SparkConnectStatement.scala | 67 ++++++++++++------
.../client/jdbc/SparkConnectStatementSuite.scala | 80 ++++++++++++++++++++++
2 files changed, 127 insertions(+), 20 deletions(-)
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
index 8de227f9d07c..8b60f309ef6d 100644
---
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.connect.client.jdbc
import java.sql.{Array => _, _}
+import org.apache.spark.sql.connect.client.SparkResult
+
class SparkConnectStatement(conn: SparkConnectConnection) extends Statement {
private var operationId: String = _
@@ -49,33 +51,51 @@ class SparkConnectStatement(conn: SparkConnectConnection)
extends Statement {
}
override def executeQuery(sql: String): ResultSet = {
- checkOpen()
-
- val df = conn.spark.sql(sql)
- val sparkResult = df.collectResult()
- operationId = sparkResult.operationId
- resultSet = new SparkConnectResultSet(sparkResult, this)
- resultSet
+ val hasResultSet = execute(sql)
+ if (hasResultSet) {
+ assert(resultSet != null)
+ resultSet
+ } else {
+ throw new SQLException("The query does not produce a ResultSet.")
+ }
}
override def executeUpdate(sql: String): Int = {
- checkOpen()
-
- val df = conn.spark.sql(sql)
- val sparkResult = df.collectResult()
- operationId = sparkResult.operationId
- resultSet = null
+ val hasResultSet = execute(sql)
+ if (hasResultSet) {
+ // user are not expected to access the result set in this case,
+ // we must close it to avoid memory leak.
+ resultSet.close()
+ throw new SQLException("The query produces a ResultSet.")
+ } else {
+ assert(resultSet == null)
+ getUpdateCount
+ }
+ }
- // always return 0 because affected rows is not supported yet
- 0
+ private def hasResultSet(sparkResult: SparkResult[_]): Boolean = {
+ // suppose this works in most cases
+ sparkResult.schema.length > 0
}
override def execute(sql: String): Boolean = {
checkOpen()
- // always perform executeQuery and reture a ResultSet
- executeQuery(sql)
- true
+ // stmt can be reused to execute more than one queries,
+ // reset before executing new query
+ operationId = null
+ resultSet = null
+
+ val df = conn.spark.sql(sql)
+ val sparkResult = df.collectResult()
+ operationId = sparkResult.operationId
+ if (hasResultSet(sparkResult)) {
+ resultSet = new SparkConnectResultSet(sparkResult, this)
+ true
+ } else {
+ sparkResult.close()
+ false
+ }
}
override def getResultSet: ResultSet = {
@@ -123,8 +143,15 @@ class SparkConnectStatement(conn: SparkConnectConnection)
extends Statement {
override def setCursorName(name: String): Unit =
throw new SQLFeatureNotSupportedException
- override def getUpdateCount: Int =
- throw new SQLFeatureNotSupportedException
+ override def getUpdateCount: Int = {
+ checkOpen()
+
+ if (resultSet != null) {
+ -1
+ } else {
+ 0 // always return 0 because affected rows is not supported yet
+ }
+ }
override def getMoreResults: Boolean =
throw new SQLFeatureNotSupportedException
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
new file mode 100644
index 000000000000..8e3b616372d8
--- /dev/null
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.{Array => _, _}
+
+import scala.util.Using
+
+import org.apache.spark.sql.connect.client.jdbc.test.JdbcHelper
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession,
SQLHelper}
+
+class SparkConnectStatementSuite extends ConnectFunSuite with
RemoteSparkSession
+ with JdbcHelper with SQLHelper {
+
+ override def jdbcUrl: String = s"jdbc:sc://localhost:$serverPort"
+
+ test("returned result set and update count of execute* methods") {
+ withTable("t1", "t2", "t3") {
+ withStatement { stmt =>
+ // CREATE TABLE
+ assert(!stmt.execute("CREATE TABLE t1 (id INT) USING Parquet"))
+ assert(stmt.getUpdateCount === 0)
+ assert(stmt.getResultSet === null)
+
+ var se = intercept[SQLException] {
+ stmt.executeQuery("CREATE TABLE t2 (id INT) USING Parquet")
+ }
+ assert(se.getMessage === "The query does not produce a ResultSet.")
+
+ assert(stmt.executeUpdate("CREATE TABLE t3 (id INT) USING Parquet")
=== 0)
+ assert(stmt.getResultSet === null)
+
+ // INSERT INTO
+ assert(!stmt.execute("INSERT INTO t1 VALUES (1)"))
+ assert(stmt.getUpdateCount === 0)
+ assert(stmt.getResultSet === null)
+
+ se = intercept[SQLException] {
+ stmt.executeQuery("INSERT INTO t1 VALUES (1)")
+ }
+ assert(se.getMessage === "The query does not produce a ResultSet.")
+
+ assert(stmt.executeUpdate("INSERT INTO t1 VALUES (1)") === 0)
+ assert(stmt.getResultSet === null)
+
+ // SELECT
+ assert(stmt.execute("SELECT id FROM t1"))
+ assert(stmt.getUpdateCount === -1)
+ Using.resource(stmt.getResultSet) { rs =>
+ assert(rs !== null)
+ }
+
+ Using.resource(stmt.executeQuery("SELECT id FROM t1")) { rs =>
+ assert(stmt.getUpdateCount === -1)
+ assert(rs !== null)
+ }
+
+ se = intercept[SQLException] {
+ stmt.executeUpdate("SELECT id FROM t1")
+ }
+ assert(se.getMessage === "The query produces a ResultSet.")
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]