This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 71599034d786 [SPARK-54014][CONNECT] Support max rows for
SparkConnectStatement
71599034d786 is described below
commit 71599034d78686dfd119213add68fd3f519c7a2f
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Nov 5 07:43:54 2025 -0800
[SPARK-54014][CONNECT] Support max rows for SparkConnectStatement
### What changes were proposed in this pull request?
This PR implements the two methods of the `java.sql.Statement` interface
for `SparkConnectStatement`
```
/**
* Retrieves the maximum number of rows that a
* {code ResultSet} object produced by this
* {code Statement} object can contain. If this limit is exceeded,
* the excess rows are silently dropped.
*
* return the current maximum number of rows for a {code ResultSet}
* object produced by this {code Statement} object;
* zero means there is no limit
* throws SQLException if a database access error occurs or
* this method is called on a closed {code Statement}
* see #setMaxRows
*/
int getMaxRows() throws SQLException;
/**
* Sets the limit for the maximum number of rows that any
* {code ResultSet} object generated by this {code Statement}
* object can contain to the given number.
* If the limit is exceeded, the excess
* rows are silently dropped.
*
* param max the new max rows limit; zero means there is no limit
* throws SQLException if a database access error occurs,
* this method is called on a closed {code Statement}
* or the condition {code max >= 0} is not satisfied
* see #getMaxRows
*/
void setMaxRows(int max) throws SQLException;
```
### Why are the changes needed?
Implement more JDBC APIs.
### Does this PR introduce _any_ user-facing change?
No, it's new feature.
### How was this patch tested?
New UTs are added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52742 from pan3793/SPARK-54014.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 07cab004409c81d43001f9a0e309f5630ff5ab4c)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../client/jdbc/SparkConnectStatement.scala | 19 ++++++++---
.../client/jdbc/SparkConnectStatementSuite.scala | 39 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 4 deletions(-)
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
index 8b60f309ef6d..3df1ff65498d 100644
---
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
@@ -26,6 +26,8 @@ class SparkConnectStatement(conn: SparkConnectConnection)
extends Statement {
private var operationId: String = _
private var resultSet: SparkConnectResultSet = _
+ private var maxRows: Int = 0
+
@volatile private var closed: Boolean = false
override def isClosed: Boolean = closed
@@ -86,7 +88,10 @@ class SparkConnectStatement(conn: SparkConnectConnection)
extends Statement {
operationId = null
resultSet = null
- val df = conn.spark.sql(sql)
+ var df = conn.spark.sql(sql)
+ if (maxRows > 0) {
+ df = df.limit(maxRows)
+ }
val sparkResult = df.collectResult()
operationId = sparkResult.operationId
if (hasResultSet(sparkResult)) {
@@ -111,11 +116,17 @@ class SparkConnectStatement(conn: SparkConnectConnection)
extends Statement {
override def getMaxRows: Int = {
checkOpen()
- 0
+ this.maxRows
}
- override def setMaxRows(max: Int): Unit =
- throw new SQLFeatureNotSupportedException
+ override def setMaxRows(max: Int): Unit = {
+ checkOpen()
+
+ if (max < 0) {
+ throw new SQLException("The max rows must be zero or a positive
integer.")
+ }
+ this.maxRows = max
+ }
override def setEscapeProcessing(enable: Boolean): Unit =
throw new SQLFeatureNotSupportedException
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
index 8e3b616372d8..fa9df3f1247f 100644
---
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatementSuite.scala
@@ -77,4 +77,43 @@ class SparkConnectStatementSuite extends ConnectFunSuite
with RemoteSparkSession
}
}
}
+
+ test("max rows from SparkConnectStatement") {
+ def verifyMaxRows(
+ expectedRows: Int, query: String)(stmt: Statement): Unit = {
+ Using(stmt.executeQuery(query)) { rs =>
+ (0 until expectedRows).foreach { _ =>
+ assert(rs.next())
+ }
+ assert(!rs.next())
+ }
+ }
+
+ withStatement { stmt =>
+ // by default, it has no max rows limitation
+ assert(stmt.getMaxRows === 0)
+ verifyMaxRows(10, "SELECT id FROM range(10)")(stmt)
+
+ val se = intercept[SQLException] {
+ stmt.setMaxRows(-1)
+ }
+ assert(se.getMessage === "The max rows must be zero or a positive
integer.")
+
+ stmt.setMaxRows(5)
+ assert(stmt.getMaxRows === 5)
+ verifyMaxRows(5, "SELECT id FROM range(10)")(stmt)
+
+ // set max rows for query that has LIMIT
+ stmt.setMaxRows(5)
+ assert(stmt.getMaxRows === 5)
+ verifyMaxRows(3, "SELECT id FROM range(10) LIMIT 3")(stmt)
+ verifyMaxRows(5, "SELECT id FROM range(10) LIMIT 8")(stmt)
+
+ // set max rows for one statement won't affect others
+ withStatement { stmt2 =>
+ assert(stmt2.getMaxRows === 0)
+ verifyMaxRows(10, "SELECT id FROM range(10)")(stmt2)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]