This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 71599034d786 [SPARK-54014][CONNECT] Support max rows for 
SparkConnectStatement
71599034d786 is described below

commit 71599034d78686dfd119213add68fd3f519c7a2f
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Nov 5 07:43:54 2025 -0800

    [SPARK-54014][CONNECT] Support max rows for SparkConnectStatement
    
    ### What changes were proposed in this pull request?
    
    This PR implements the two methods of the `java.sql.Statement` interface 
for `SparkConnectStatement`
    ```
        /**
         * Retrieves the maximum number of rows that a
         * {code ResultSet} object produced by this
         * {code Statement} object can contain.  If this limit is exceeded,
         * the excess rows are silently dropped.
         *
         * return the current maximum number of rows for a {code ResultSet}
         *         object produced by this {code Statement} object;
         *         zero means there is no limit
         * throws SQLException if a database access error occurs or
         * this method is called on a closed {code Statement}
         * see #setMaxRows
         */
        int getMaxRows() throws SQLException;
    
        /**
         * Sets the limit for the maximum number of rows that any
         * {code ResultSet} object  generated by this {code Statement}
         * object can contain to the given number.
         * If the limit is exceeded, the excess
         * rows are silently dropped.
         *
         * param max the new max rows limit; zero means there is no limit
         * throws SQLException if a database access error occurs,
         * this method is called on a closed {code Statement}
         *            or the condition {code max >= 0} is not satisfied
         * see #getMaxRows
         */
        void setMaxRows(int max) throws SQLException;
    ```
    
    ### Why are the changes needed?
    
    Implement more JDBC APIs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, it's new feature.
    
    ### How was this patch tested?
    
    New UTs are added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52742 from pan3793/SPARK-54014.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 07cab004409c81d43001f9a0e309f5630ff5ab4c)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../client/jdbc/SparkConnectStatement.scala        | 19 ++++++++---
 .../client/jdbc/SparkConnectStatementSuite.scala   | 39 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 4 deletions(-)

diff --git 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
index 8b60f309ef6d..3df1ff65498d 100644
--- 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
+++ 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
@@ -26,6 +26,8 @@ class SparkConnectStatement(conn: SparkConnectConnection) 
extends Statement {
   private var operationId: String = _
   private var resultSet: SparkConnectResultSet = _
 
+  private var maxRows: Int = 0
+
   @volatile private var closed: Boolean = false
 
   override def isClosed: Boolean = closed
@@ -86,7 +88,10 @@ class SparkConnectStatement(conn: SparkConnectConnection) 
extends Statement {
     operationId = null
     resultSet = null
 
-    val df = conn.spark.sql(sql)
+    var df = conn.spark.sql(sql)
+    if (maxRows > 0) {
+      df = df.limit(maxRows)
+    }
     val sparkResult = df.collectResult()
     operationId = sparkResult.operationId
     if (hasResultSet(sparkResult)) {
@@ -111,11 +116,17 @@ class SparkConnectStatement(conn: SparkConnectConnection) 
extends Statement {
 
   override def getMaxRows: Int = {
     checkOpen()
-    0
+    this.maxRows
   }
 
-  override def setMaxRows(max: Int): Unit =
-    throw new SQLFeatureNotSupportedException
+  override def setMaxRows(max: Int): Unit = {
+    checkOpen()
+
+    if (max < 0) {
+      throw new SQLException("The max rows must be zero or a positive 
integer.")
+    }
+    this.maxRows = max
+  }
 
   override def setEscapeProcessing(enable: Boolean): Unit =
     throw new SQLFeatureNotSupportedException
diff --git 
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
index 8e3b616372d8..fa9df3f1247f 100644
--- 
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
+++ 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
@@ -77,4 +77,43 @@ class SparkConnectStatementSuite extends ConnectFunSuite 
with RemoteSparkSession
       }
     }
   }
+
+  test("max rows from SparkConnectStatement") {
+    def verifyMaxRows(
+        expectedRows: Int, query: String)(stmt: Statement): Unit = {
+      Using(stmt.executeQuery(query)) { rs =>
+        (0 until expectedRows).foreach { _ =>
+          assert(rs.next())
+        }
+        assert(!rs.next())
+      }
+    }
+
+    withStatement { stmt =>
+      // by default, it has no max rows limitation
+      assert(stmt.getMaxRows === 0)
+      verifyMaxRows(10, "SELECT id FROM range(10)")(stmt)
+
+      val se = intercept[SQLException] {
+        stmt.setMaxRows(-1)
+      }
+      assert(se.getMessage === "The max rows must be zero or a positive 
integer.")
+
+      stmt.setMaxRows(5)
+      assert(stmt.getMaxRows === 5)
+      verifyMaxRows(5, "SELECT id FROM range(10)")(stmt)
+
+      // set max rows for query that has LIMIT
+      stmt.setMaxRows(5)
+      assert(stmt.getMaxRows === 5)
+      verifyMaxRows(3, "SELECT id FROM range(10) LIMIT 3")(stmt)
+      verifyMaxRows(5, "SELECT id FROM range(10) LIMIT 8")(stmt)
+
+      // set max rows for one statement won't affect others
+      withStatement { stmt2 =>
+        assert(stmt2.getMaxRows === 0)
+        verifyMaxRows(10, "SELECT id FROM range(10)")(stmt2)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to