This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 029393a0ef30 [SPARK-54054][CONNECT] Support row position for 
SparkConnectResultSet
029393a0ef30 is described below

commit 029393a0ef30cef4cda03e47135a28df5e8344f1
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Oct 29 15:48:58 2025 +0900

    [SPARK-54054][CONNECT] Support row position for SparkConnectResultSet
    
    ### What changes were proposed in this pull request?
    
    This PR implements below methods of the `java.sql.ResultSet` interface for 
`SparkConnectResultSet`
    
    ```
    boolean isBeforeFirst() throws SQLException;
    boolean isAfterLast() throws SQLException;
    boolean isFirst() throws SQLException;
    boolean isLast() throws SQLException;
    int getRow() throws SQLException;
    void setFetchDirection(int direction) throws SQLException;
    int getFetchDirection() throws SQLException;
    ```
    
    ### Why are the changes needed?
    
    Implement more JDBC APIs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, it's new feature.
    
    ### How was this patch tested?
    
    New UTs are added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52756 from pan3793/SPARK-54054.
    
    Lead-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Kousuke Saruta <[email protected]>
---
 .../client/jdbc/SparkConnectResultSet.scala        |  60 ++++++++--
 .../connect/client/jdbc/util/JdbcErrorUtils.scala  |  16 +++
 .../client/jdbc/SparkConnectResultSetSuite.scala   | 125 +++++++++++++++++++++
 3 files changed, 191 insertions(+), 10 deletions(-)

diff --git 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
index 38417b0de217..0745ddc09911 100644
--- 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
+++ 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
@@ -25,6 +25,7 @@ import java.util.Calendar
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.jdbc.util.JdbcErrorUtils
 
 class SparkConnectResultSet(
     sparkResult: SparkResult[Row],
@@ -34,16 +35,25 @@ class SparkConnectResultSet(
 
   private var currentRow: Row = _
 
-  private var _wasNull: Boolean = false
+  // cursor is 1-based, range in [0, length + 1]
+  // - 0 means beforeFirstRow
+  // - value in [1, length] means the row number
+  // - length + 1 means afterLastRow
+  private var cursor: Int = 0
 
+  private var _wasNull: Boolean = false
   override def wasNull: Boolean = _wasNull
 
   override def next(): Boolean = {
     val hasNext = iterator.hasNext
     if (hasNext) {
       currentRow = iterator.next()
+      cursor += 1
     } else {
       currentRow = null
+      if (cursor > 0 && cursor == sparkResult.length) {
+        cursor += 1
+      }
     }
     hasNext
   }
@@ -253,13 +263,25 @@ class SparkConnectResultSet(
   override def getBigDecimal(columnLabel: String): java.math.BigDecimal =
     throw new SQLFeatureNotSupportedException
 
-  override def isBeforeFirst: Boolean = throw new 
SQLFeatureNotSupportedException
+  override def isBeforeFirst: Boolean = {
+    checkOpen()
+    cursor < 1 && sparkResult.length > 0
+  }
 
-  override def isAfterLast: Boolean = throw new SQLFeatureNotSupportedException
+  override def isFirst: Boolean = {
+    checkOpen()
+    cursor == 1
+  }
 
-  override def isFirst: Boolean = throw new SQLFeatureNotSupportedException
+  override def isLast: Boolean = {
+    checkOpen()
+    cursor > 0 && cursor == sparkResult.length
+  }
 
-  override def isLast: Boolean = throw new SQLFeatureNotSupportedException
+  override def isAfterLast: Boolean = {
+    checkOpen()
+    cursor > 0 && cursor > sparkResult.length
+  }
 
   override def beforeFirst(): Unit = throw new SQLFeatureNotSupportedException
 
@@ -269,7 +291,15 @@ class SparkConnectResultSet(
 
   override def last(): Boolean = throw new SQLFeatureNotSupportedException
 
-  override def getRow: Int = throw new SQLFeatureNotSupportedException
+  override def getRow: Int = {
+    checkOpen()
+
+    if (cursor < 1 || cursor > sparkResult.length) {
+      0
+    } else {
+      cursor
+    }
+  }
 
   override def absolute(row: Int): Boolean = throw new 
SQLFeatureNotSupportedException
 
@@ -277,11 +307,21 @@ class SparkConnectResultSet(
 
   override def previous(): Boolean = throw new SQLFeatureNotSupportedException
 
-  override def setFetchDirection(direction: Int): Unit =
-    throw new SQLFeatureNotSupportedException
+  override def setFetchDirection(direction: Int): Unit = {
+    checkOpen()
+    assert(this.getType == ResultSet.TYPE_FORWARD_ONLY)
 
-  override def getFetchDirection: Int =
-    throw new SQLFeatureNotSupportedException
+    if (direction != ResultSet.FETCH_FORWARD) {
+      throw new SQLException(
+        s"Fetch direction ${JdbcErrorUtils.stringifyFetchDirection(direction)} 
is not supported " +
+          s"for 
${JdbcErrorUtils.stringifyResultSetType(ResultSet.TYPE_FORWARD_ONLY)} result 
set.")
+    }
+  }
+
+  override def getFetchDirection: Int = {
+    checkOpen()
+    ResultSet.FETCH_FORWARD
+  }
 
   override def setFetchSize(rows: Int): Unit =
     throw new SQLFeatureNotSupportedException
diff --git 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
index cb941b7420a7..3d9f72d87d15 100644
--- 
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
+++ 
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
@@ -37,4 +37,20 @@ private[jdbc] object JdbcErrorUtils {
     case _ =>
       throw new IllegalArgumentException(s"Invalid holdability: $holdability")
   }
+
+  def stringifyResultSetType(typ: Int): String = typ match {
+    case ResultSet.TYPE_FORWARD_ONLY => "FORWARD_ONLY"
+    case ResultSet.TYPE_SCROLL_INSENSITIVE => "SCROLL_INSENSITIVE"
+    case ResultSet.TYPE_SCROLL_SENSITIVE => "SCROLL_SENSITIVE"
+    case _ =>
+      throw new IllegalArgumentException(s"Invalid ResultSet type: $typ")
+  }
+
+  def stringifyFetchDirection(direction: Int): String = direction match {
+    case ResultSet.FETCH_FORWARD => "FETCH_FORWARD"
+    case ResultSet.FETCH_REVERSE => "FETCH_REVERSE"
+    case ResultSet.FETCH_UNKNOWN => "FETCH_UNKNOWN"
+    case _ =>
+      throw new IllegalArgumentException(s"Invalid fetch direction: 
$direction")
+  }
 }
diff --git 
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetSuite.scala
 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetSuite.scala
new file mode 100644
index 000000000000..ac2866837e93
--- /dev/null
+++ 
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetSuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.{Array => _, _}
+
+import org.apache.spark.sql.connect.client.jdbc.test.JdbcHelper
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
+
+class SparkConnectResultSetSuite extends ConnectFunSuite with 
RemoteSparkSession
+  with JdbcHelper {
+
+  def jdbcUrl: String = s"jdbc:sc://localhost:$serverPort"
+
+  test("type, concurrency, fetch direction of result set") {
+    withExecuteQuery("SELECT 1") { rs =>
+      rs.getType === ResultSet.TYPE_FORWARD_ONLY
+      rs.getConcurrency === ResultSet.CONCUR_READ_ONLY
+      rs.getFetchDirection === ResultSet.FETCH_FORWARD
+      Seq(ResultSet.FETCH_FORWARD, ResultSet.FETCH_REVERSE,
+        ResultSet.FETCH_UNKNOWN).foreach { direction =>
+        if (direction == ResultSet.FETCH_FORWARD) {
+          rs.setFetchDirection(direction)
+        } else {
+          intercept[SQLException] {
+            rs.setFetchDirection(direction)
+          }
+        }
+      }
+    }
+  }
+
+  test("row position for empty result set") {
+    withExecuteQuery("SELECT * FROM range(0)") { rs =>
+      assert(rs.getRow === 0)
+      assert(!rs.isBeforeFirst)
+      assert(!rs.isFirst)
+      assert(!rs.isLast)
+      assert(!rs.isAfterLast)
+
+      assert(!rs.next())
+
+      assert(rs.getRow === 0)
+      assert(!rs.isBeforeFirst)
+      assert(!rs.isFirst)
+      assert(!rs.isLast)
+      assert(!rs.isAfterLast)
+    }
+  }
+
+  test("row position for one row result set") {
+    withExecuteQuery("SELECT * FROM range(1)") { rs =>
+      assert(rs.getRow === 0)
+      assert(rs.isBeforeFirst)
+      assert(!rs.isFirst)
+      assert(!rs.isLast)
+      assert(!rs.isAfterLast)
+
+      assert(rs.next())
+
+      assert(rs.getRow === 1)
+      assert(!rs.isBeforeFirst)
+      assert(rs.isFirst)
+      assert(rs.isLast)
+      assert(!rs.isAfterLast)
+
+      assert(!rs.next())
+
+      assert(rs.getRow === 0)
+      assert(!rs.isBeforeFirst)
+      assert(!rs.isFirst)
+      assert(!rs.isLast)
+      assert(rs.isAfterLast)
+    }
+  }
+
+  test("row position for multiple rows result set") {
+    withExecuteQuery("SELECT * FROM range(2)") { rs =>
+      assert(rs.getRow === 0)
+      assert(rs.isBeforeFirst)
+      assert(!rs.isFirst)
+      assert(!rs.isLast)
+      assert(!rs.isAfterLast)
+
+      assert(rs.next())
+
+      assert(rs.getRow === 1)
+      assert(!rs.isBeforeFirst)
+      assert(rs.isFirst)
+      assert(!rs.isLast)
+      assert(!rs.isAfterLast)
+
+      assert(rs.next())
+
+      assert(rs.getRow === 2)
+      assert(!rs.isBeforeFirst)
+      assert(!rs.isFirst)
+      assert(rs.isLast)
+      assert(!rs.isAfterLast)
+
+      assert(!rs.next())
+
+      assert(rs.getRow === 0)
+      assert(!rs.isBeforeFirst)
+      assert(!rs.isFirst)
+      assert(!rs.isLast)
+      assert(rs.isAfterLast)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to