This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new edfd3578d586 [SPARK-54108][CONNECT] Revise execute* methods of 
SparkConnectStatement
edfd3578d586 is described below

commit edfd3578d586ac6da019e49ca44727e3e8842cbd
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Nov 3 16:24:14 2025 +0800

    [SPARK-54108][CONNECT] Revise execute* methods of SparkConnectStatement
    
    ### What changes were proposed in this pull request?
    
    This PR revises the following 3 `execute*` methods and one additional 
`getUpdateCount` method  of `SparkConnectStatement` that are defined in 
`java.sql.Statement`
    
    ```java
        /**
         * Executes the given SQL statement, which returns a single
         * {code ResultSet} object.
         *<p>
         * <strong>Note:</strong>This method cannot be called on a
         * {code PreparedStatement} or {code CallableStatement}.
         * param sql an SQL statement to be sent to the database, typically a
         *        static SQL {code SELECT} statement
         * return a {code ResultSet} object that contains the data produced
         *         by the given query; never {code null}
         * throws SQLException if a database access error occurs,
         * this method is called on a closed {code Statement}, the given
         *            SQL statement produces anything other than a single
         *            {code ResultSet} object, the method is called on a
         * {code PreparedStatement} or {code CallableStatement}
         * throws SQLTimeoutException when the driver has determined that the
         * timeout value that was specified by the {code setQueryTimeout}
         * method has been exceeded and has at least attempted to cancel
         * the currently running {code Statement}
         */
        ResultSet executeQuery(String sql) throws SQLException;
    
        /**
         * Executes the given SQL statement, which may be an {code INSERT},
         * {code UPDATE}, or {code DELETE} statement or an
         * SQL statement that returns nothing, such as an SQL DDL statement.
         *<p>
         * <strong>Note:</strong>This method cannot be called on a
         * {code PreparedStatement} or {code CallableStatement}.
         * param sql an SQL Data Manipulation Language (DML) statement, such as 
{code INSERT}, {code UPDATE} or
         * {code DELETE}; or an SQL statement that returns nothing,
         * such as a DDL statement.
         *
         * return either (1) the row count for SQL Data Manipulation Language 
(DML) statements
         *         or (2) 0 for SQL statements that return nothing
         *
         * throws SQLException if a database access error occurs,
         * this method is called on a closed {code Statement}, the given
         * SQL statement produces a {code ResultSet} object, the method is 
called on a
         * {code PreparedStatement} or {code CallableStatement}
         * throws SQLTimeoutException when the driver has determined that the
         * timeout value that was specified by the {code setQueryTimeout}
         * method has been exceeded and has at least attempted to cancel
         * the currently running {code Statement}
         */
    
        int executeUpdate(String sql) throws SQLException;
        /**
         * Executes the given SQL statement, which may return multiple results.
         * In some (uncommon) situations, a single SQL statement may return
         * multiple result sets and/or update counts.  Normally you can ignore
         * this unless you are (1) executing a stored procedure that you know 
may
         * return multiple results or (2) you are dynamically executing an
         * unknown SQL string.
         * <P>
         * The {code execute} method executes an SQL statement and indicates the
         * form of the first result.  You must then use the methods
         * {code getResultSet} or {code getUpdateCount}
         * to retrieve the result, and {code getMoreResults} to
         * move to any subsequent result(s).
         * <p>
         *<strong>Note:</strong>This method cannot be called on a
         * {code PreparedStatement} or {code CallableStatement}.
         * param sql any SQL statement
         * return {code true} if the first result is a {code ResultSet}
         *         object; {code false} if it is an update count or there are
         *         no results
         * throws SQLException if a database access error occurs,
         * this method is called on a closed {code Statement},
         * the method is called on a
         * {code PreparedStatement} or {code CallableStatement}
         * throws SQLTimeoutException when the driver has determined that the
         * timeout value that was specified by the {code setQueryTimeout}
         * method has been exceeded and has at least attempted to cancel
         * the currently running {code Statement}
         * see #getResultSet
         * see #getUpdateCount
         * see #getMoreResults
         */
        boolean execute(String sql) throws SQLException;
    
        /**
         *  Retrieves the current result as an update count;
         *  if the result is a {code ResultSet} object or there are no more 
results, -1
         *  is returned. This method should be called only once per result.
         *
         * return the current result as an update count; -1 if the current 
result is a
         * {code ResultSet} object or there are no more results
         * throws SQLException if a database access error occurs or
         * this method is called on a closed {code Statement}
         * see #execute
         */
        int getUpdateCount() throws SQLException;
    ```
    
    ### Why are the changes needed?
    
    Make the implementation respect the JDBC API specification.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, Connect JDBC Driver is an unreleased feature.
    
    ### How was this patch tested?
    
    New UTs are added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52810 from pan3793/SPARK-54108.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../client/jdbc/SparkConnectStatement.scala        | 67 ++++++++++++------
 .../client/jdbc/SparkConnectStatementSuite.scala   | 80 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 20 deletions(-)

diff --git 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
index 8de227f9d07c..8b60f309ef6d 100644
--- 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
+++ 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.connect.client.jdbc
 
 import java.sql.{Array => _, _}
 
+import org.apache.spark.sql.connect.client.SparkResult
+
 class SparkConnectStatement(conn: SparkConnectConnection) extends Statement {
 
   private var operationId: String = _
@@ -49,33 +51,51 @@ class SparkConnectStatement(conn: SparkConnectConnection) 
extends Statement {
   }
 
   override def executeQuery(sql: String): ResultSet = {
-    checkOpen()
-
-    val df = conn.spark.sql(sql)
-    val sparkResult = df.collectResult()
-    operationId = sparkResult.operationId
-    resultSet = new SparkConnectResultSet(sparkResult, this)
-    resultSet
+    val hasResultSet = execute(sql)
+    if (hasResultSet) {
+      assert(resultSet != null)
+      resultSet
+    } else {
+      throw new SQLException("The query does not produce a ResultSet.")
+    }
   }
 
   override def executeUpdate(sql: String): Int = {
-    checkOpen()
-
-    val df = conn.spark.sql(sql)
-    val sparkResult = df.collectResult()
-    operationId = sparkResult.operationId
-    resultSet = null
+    val hasResultSet = execute(sql)
+    if (hasResultSet) {
+      // user are not expected to access the result set in this case,
+      // we must close it to avoid memory leak.
+      resultSet.close()
+      throw new SQLException("The query produces a ResultSet.")
+    } else {
+      assert(resultSet == null)
+      getUpdateCount
+    }
+  }
 
-    // always return 0 because affected rows is not supported yet
-    0
+  private def hasResultSet(sparkResult: SparkResult[_]): Boolean = {
+    // suppose this works in most cases
+    sparkResult.schema.length > 0
   }
 
   override def execute(sql: String): Boolean = {
     checkOpen()
 
-    // always perform executeQuery and reture a ResultSet
-    executeQuery(sql)
-    true
+    // stmt can be reused to execute more than one queries,
+    // reset before executing new query
+    operationId = null
+    resultSet = null
+
+    val df = conn.spark.sql(sql)
+    val sparkResult = df.collectResult()
+    operationId = sparkResult.operationId
+    if (hasResultSet(sparkResult)) {
+      resultSet = new SparkConnectResultSet(sparkResult, this)
+      true
+    } else {
+      sparkResult.close()
+      false
+    }
   }
 
   override def getResultSet: ResultSet = {
@@ -123,8 +143,15 @@ class SparkConnectStatement(conn: SparkConnectConnection) 
extends Statement {
   override def setCursorName(name: String): Unit =
     throw new SQLFeatureNotSupportedException
 
-  override def getUpdateCount: Int =
-    throw new SQLFeatureNotSupportedException
+  override def getUpdateCount: Int = {
+    checkOpen()
+
+    if (resultSet != null) {
+      -1
+    } else {
+      0 // always return 0 because affected rows is not supported yet
+    }
+  }
 
   override def getMoreResults: Boolean =
     throw new SQLFeatureNotSupportedException
diff --git 
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
new file mode 100644
index 000000000000..8e3b616372d8
--- /dev/null
+++ 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.{Array => _, _}
+
+import scala.util.Using
+
+import org.apache.spark.sql.connect.client.jdbc.test.JdbcHelper
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession, 
SQLHelper}
+
+class SparkConnectStatementSuite extends ConnectFunSuite with 
RemoteSparkSession
+  with JdbcHelper with SQLHelper {
+
+  override def jdbcUrl: String = s"jdbc:sc://localhost:$serverPort"
+
+  test("returned result set and update count of execute* methods") {
+    withTable("t1", "t2", "t3") {
+      withStatement { stmt =>
+        // CREATE TABLE
+        assert(!stmt.execute("CREATE TABLE t1 (id INT) USING Parquet"))
+        assert(stmt.getUpdateCount === 0)
+        assert(stmt.getResultSet === null)
+
+        var se = intercept[SQLException] {
+          stmt.executeQuery("CREATE TABLE t2 (id INT) USING Parquet")
+        }
+        assert(se.getMessage === "The query does not produce a ResultSet.")
+
+        assert(stmt.executeUpdate("CREATE TABLE t3 (id INT) USING Parquet") 
=== 0)
+        assert(stmt.getResultSet === null)
+
+        // INSERT INTO
+        assert(!stmt.execute("INSERT INTO t1 VALUES (1)"))
+        assert(stmt.getUpdateCount === 0)
+        assert(stmt.getResultSet === null)
+
+        se = intercept[SQLException] {
+          stmt.executeQuery("INSERT INTO t1 VALUES (1)")
+        }
+        assert(se.getMessage === "The query does not produce a ResultSet.")
+
+        assert(stmt.executeUpdate("INSERT INTO t1 VALUES (1)") === 0)
+        assert(stmt.getResultSet === null)
+
+        // SELECT
+        assert(stmt.execute("SELECT id FROM t1"))
+        assert(stmt.getUpdateCount === -1)
+        Using.resource(stmt.getResultSet) { rs =>
+          assert(rs !== null)
+        }
+
+        Using.resource(stmt.executeQuery("SELECT id FROM t1")) { rs =>
+          assert(stmt.getUpdateCount === -1)
+          assert(rs !== null)
+        }
+
+        se = intercept[SQLException] {
+          stmt.executeUpdate("SELECT id FROM t1")
+        }
+        assert(se.getMessage === "The query produces a ResultSet.")
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to