This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bac2500278 issue-16707: Add cursor based pagination support to java 
client (#16782)
9bac2500278 is described below

commit 9bac2500278c0e903f750a6b34c1e396d122985d
Author: John Solomon J <[email protected]>
AuthorDate: Wed Sep 17 19:02:33 2025 -0700

    issue-16707: Add cursor based pagination support to java client (#16782)
    
    * issue-16707 Add cursor based pagination support to java client
    
    * issue-16707 Refactor to use Cursor Pattern
    
    * issue-16707 Refactor to use Cursor Pattern continued
    
    * issue-16707 Add missing changes
    
    * issue-16707 Add missing changes continued
    
    * issue-16707 Address review comments, return 1 based page num instead of 0 
based
    
    * issue-16707 Make Cursor navigations thread safe
    
    * issue-16707 Fix checkstyle and spottless violations
    
    ---------
    
    Co-authored-by: John Solomon J <[email protected]>
---
 .../apache/pinot/client/BaseResultSetGroup.java    |  73 ++++
 .../org/apache/pinot/client/BrokerResponse.java    |  31 +-
 .../java/org/apache/pinot/client/Connection.java   |  71 +++-
 .../org/apache/pinot/client/ConnectionFactory.java |   5 +
 .../pinot/client/CursorAwareBrokerResponse.java    | 131 ++++++++
 .../org/apache/pinot/client/CursorCapable.java     | 188 +++++++++++
 .../apache/pinot/client/CursorResultSetGroup.java  | 121 +++++++
 .../client/JsonAsyncHttpPinotClientTransport.java  | 291 +++++++++++++++-
 .../java/org/apache/pinot/client/ResultCursor.java | 165 ++++++++++
 .../org/apache/pinot/client/ResultCursorImpl.java  | 366 +++++++++++++++++++++
 .../org/apache/pinot/client/ResultSetGroup.java    |   2 +-
 .../apache/pinot/client/ConnectionFactoryTest.java | 144 ++++++++
 .../client/CursorAwareBrokerResponseTest.java      | 163 +++++++++
 .../pinot/client/CursorResultSetGroupTest.java     |  90 +++++
 .../JsonAsyncHttpPinotClientTransportTest.java     | 179 ++++++++++
 .../org/apache/pinot/client/ResultCursorTest.java  | 353 ++++++++++++++++++++
 16 files changed, 2363 insertions(+), 10 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BaseResultSetGroup.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BaseResultSetGroup.java
new file mode 100644
index 00000000000..9dd76e4b90f
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BaseResultSetGroup.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.List;
+
+/**
+ * Base interface for result set groups that provides common contract for 
accessing query results.
+ *
+ * This interface defines the core methods that all result set group 
implementations should provide,
+ * allowing for polymorphic usage across different result set group types 
(regular and cursor-based).
+ *
+ * <p><strong>Backward Compatibility:</strong> This interface is designed to 
be implemented by existing
+ * classes without breaking any existing code.
+ */
+public interface BaseResultSetGroup {
+
+  /**
+   * Returns the number of result sets in this result set group.
+   *
+   * There is one result set per aggregation function in the original query
+   * and one result set in the case of a selection query.
+   *
+   * @return The number of result sets, or 0 if there are no result sets
+   */
+  int getResultSetCount();
+
+  /**
+   * Obtains the result set at the given index, starting from zero.
+   *
+   * @param index The index for which to obtain the result set
+   * @return The result set at the given index
+   * @throws IndexOutOfBoundsException if the index is out of range
+   */
+  ResultSet getResultSet(int index);
+
+  /**
+   * Gets the execution statistics for this query.
+   *
+   * @return The execution statistics
+   */
+  ExecutionStats getExecutionStats();
+
+  /**
+   * Gets any exceptions that occurred during query processing.
+   *
+   * @return A list of exceptions, or an empty list if no exceptions occurred
+   */
+  List<PinotClientException> getExceptions();
+
+  /**
+   * Gets the underlying broker response.
+   *
+   * @return The broker response
+   */
+  BrokerResponse getBrokerResponse();
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
index a2c4d757bc7..a24f7f368f2 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
@@ -36,13 +36,14 @@ public class BrokerResponse {
   private BrokerResponse() {
   }
 
-  private BrokerResponse(JsonNode brokerResponse) {
-    _requestId = brokerResponse.get("requestId") != null ? 
brokerResponse.get("requestId").asText() : "unknown";
-    _brokerId = brokerResponse.get("brokerId") != null ? 
brokerResponse.get("brokerId").asText() : "unknown";
-    _aggregationResults = brokerResponse.get("aggregationResults");
-    _exceptions = brokerResponse.get("exceptions");
-    _selectionResults = brokerResponse.get("selectionResults");
-    _resultTable = brokerResponse.get("resultTable");
+  protected BrokerResponse(JsonNode brokerResponse) {
+    // Use helper methods for consistent null handling
+    _requestId = getTextOrDefault(brokerResponse, "requestId", "unknown");
+    _brokerId = getTextOrDefault(brokerResponse, "brokerId", "unknown");
+    _aggregationResults = getJsonNodeOrNull(brokerResponse, 
"aggregationResults");
+    _exceptions = getJsonNodeOrNull(brokerResponse, "exceptions");
+    _selectionResults = getJsonNodeOrNull(brokerResponse, "selectionResults");
+    _resultTable = getJsonNodeOrNull(brokerResponse, "resultTable");
     _executionStats = ExecutionStats.fromJson(brokerResponse);
   }
 
@@ -93,4 +94,20 @@ public class BrokerResponse {
   public String getBrokerId() {
     return _brokerId;
   }
+
+  // Helper methods for extracting values from JsonNode with null checks
+  private static String getTextOrDefault(JsonNode node, String fieldName, 
String defaultValue) {
+    if (node == null) {
+      return defaultValue;
+    }
+    JsonNode valueNode = node.get(fieldName);
+    return (valueNode != null && !valueNode.isNull()) ? valueNode.asText() : 
defaultValue;
+  }
+
+  private static JsonNode getJsonNodeOrNull(JsonNode node, String fieldName) {
+    if (node == null) {
+      return null;
+    }
+    return node.get(fieldName);
+  }
 }
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index 8ba457981db..49f2152557d 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -26,7 +26,6 @@ import javax.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * A connection to Pinot, normally created through calls to the {@link 
ConnectionFactory}.
  */
@@ -205,4 +204,74 @@ public class Connection {
   public PinotClientTransport<?> getTransport() {
     return _transport;
   }
+
+  /**
+   * Opens a cursor for the given query, enabling pagination through large 
result sets.
+   * The returned cursor starts with the first page already loaded.
+   *
+   * @param query the query to execute
+   * @param pageSize the number of rows per page
+   * @return ResultCursor with the first page loaded and ready for navigation
+   * @throws PinotClientException If an exception occurs while processing the 
query
+   */
+  public ResultCursor openCursor(String query, int pageSize) throws 
PinotClientException {
+    try {
+      return openCursorAsync(query, pageSize).get();
+    } catch (Exception e) {
+      if (e.getCause() instanceof PinotClientException) {
+        throw (PinotClientException) e.getCause();
+      } else if (e.getCause() instanceof UnsupportedOperationException) {
+        throw (UnsupportedOperationException) e.getCause();
+      } else {
+        throw new PinotClientException("Failed to open cursor", e);
+      }
+    }
+  }
+
+  /**
+   * Opens a cursor for the given query asynchronously, enabling pagination 
through large result sets.
+   * The returned cursor starts with the first page already loaded.
+   *
+   * @param query the query to execute
+   * @param pageSize the number of rows per page
+   * @return CompletableFuture containing ResultCursor with the first page 
loaded and ready for navigation
+   */
+  public CompletableFuture<ResultCursor> openCursorAsync(String query, int 
pageSize) {
+    return validateCursorSupport()
+        .thenCompose(unused -> selectBrokerForCursor(query))
+        .thenCompose(brokerHostPort -> 
executeInitialCursorQuery(brokerHostPort, query, pageSize));
+  }
+
+  private CompletableFuture<Void> validateCursorSupport() {
+    if (!(_transport instanceof CursorCapable)) {
+      return CompletableFuture.failedFuture(
+          new UnsupportedOperationException("Cursor operations not supported 
by this connection type"));
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  private CompletableFuture<String> selectBrokerForCursor(String query) {
+    String[] tableNames = resolveTableName(query);
+    String brokerHostPort = _brokerSelector.selectBroker(tableNames);
+    if (brokerHostPort == null) {
+      return CompletableFuture.failedFuture(
+          new PinotClientException("Could not find broker to execute cursor 
query"));
+    }
+    return CompletableFuture.completedFuture(brokerHostPort);
+  }
+
+  private CompletableFuture<ResultCursor> executeInitialCursorQuery(String 
brokerHostPort, String query, int pageSize) {
+    try {
+      CursorCapable cursorTransport = (CursorCapable) _transport;
+      return cursorTransport.executeQueryWithCursorAsync(brokerHostPort, 
query, pageSize)
+          .thenApply(initialResponse -> {
+            if (initialResponse.hasExceptions() && _failOnExceptions) {
+              throw new PinotClientException("Query had processing exceptions: 
\n" + initialResponse.getExceptions());
+            }
+            return (ResultCursor) new ResultCursorImpl(_transport, 
brokerHostPort, initialResponse, _failOnExceptions);
+          });
+    } catch (Exception e) {
+      return CompletableFuture.failedFuture(new PinotClientException("Failed 
to open cursor", e));
+    }
+  }
 }
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
index 86e16675135..273ccb27467 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
@@ -278,4 +278,9 @@ public class ConnectionFactory {
   private static PinotClientTransport getDefault() {
     return getDefault(new Properties());
   }
+
+  @VisibleForTesting
+  static void resetDefaultTransport() {
+    _defaultTransport = null;
+  }
 }
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorAwareBrokerResponse.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorAwareBrokerResponse.java
new file mode 100644
index 00000000000..7f3c0884f05
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorAwareBrokerResponse.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Extended BrokerResponse with cursor-specific fields for cursor pagination 
queries.
+ * This class adds cursor metadata fields while maintaining full compatibility 
with BrokerResponse.
+ */
+public class CursorAwareBrokerResponse extends BrokerResponse {
+  // Cursor-specific fields from Pinot documentation
+  private final Long _offset;
+  private final Integer _numRows;
+  private final Long _numRowsResultSet;
+  private Long _cursorResultWriteTimeMs;
+  private Long _submissionTimeMs;
+  private Long _expirationTimeMs;
+  private final String _brokerHost;
+  private Integer _brokerPort;
+  private Long _bytesWritten;
+  private final Long _cursorFetchTimeMs;
+
+  /**
+   * Creates a CursorAwareBrokerResponse by parsing cursor-specific fields 
from the JSON response.
+   */
+  public static CursorAwareBrokerResponse fromJson(JsonNode brokerResponse) {
+    return new CursorAwareBrokerResponse(brokerResponse);
+  }
+
+
+  private CursorAwareBrokerResponse(JsonNode brokerResponse) {
+    super(brokerResponse); // Initialize base BrokerResponse fields
+
+    // Parse cursor-specific fields using helper methods (they handle null 
nodes gracefully)
+    _offset = getLongOrNull(brokerResponse, "offset");
+    _numRows = getIntOrNull(brokerResponse, "numRows");
+    _numRowsResultSet = getLongOrNull(brokerResponse, "numRowsResultSet");
+    _cursorResultWriteTimeMs = getLongOrNull(brokerResponse, 
"cursorResultWriteTimeMs");
+    _expirationTimeMs = getLongOrNull(brokerResponse, "expirationTimeMs");
+    _submissionTimeMs = getLongOrNull(brokerResponse, "submissionTimeMs");
+    _brokerHost = getTextOrNull(brokerResponse, "brokerHost");
+    _brokerPort = getIntOrNull(brokerResponse, "brokerPort");
+    _bytesWritten = getLongOrNull(brokerResponse, "bytesWritten");
+    _cursorFetchTimeMs = getLongOrNull(brokerResponse, "cursorFetchTimeMs");
+  }
+
+
+
+  // Cursor-specific field getters
+  public Long getOffset() {
+    return _offset;
+  }
+
+  public Integer getNumRows() {
+    return _numRows;
+  }
+
+  public Long getNumRowsResultSet() {
+    return _numRowsResultSet;
+  }
+
+  public Long getCursorResultWriteTimeMs() {
+    return _cursorResultWriteTimeMs;
+  }
+
+  public Long getSubmissionTimeMs() {
+    return _submissionTimeMs;
+  }
+
+  public Long getExpirationTimeMs() {
+    return _expirationTimeMs;
+  }
+
+  public String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  public Integer getBrokerPort() {
+    return _brokerPort;
+  }
+
+  public Long getBytesWritten() {
+    return _bytesWritten;
+  }
+
+  public Long getCursorFetchTimeMs() {
+    return _cursorFetchTimeMs;
+  }
+
+  // Helper methods for extracting values from JsonNode with null checks
+  private static Long getLongOrNull(JsonNode node, String fieldName) {
+    if (node == null) {
+      return null;
+    }
+    JsonNode valueNode = node.get(fieldName);
+    return (valueNode != null && !valueNode.isNull()) ? valueNode.asLong() : 
null;
+  }
+
+  private static Integer getIntOrNull(JsonNode node, String fieldName) {
+    if (node == null) {
+      return null;
+    }
+    JsonNode valueNode = node.get(fieldName);
+    return (valueNode != null && !valueNode.isNull()) ? valueNode.asInt() : 
null;
+  }
+
+  private static String getTextOrNull(JsonNode node, String fieldName) {
+    if (node == null) {
+      return null;
+    }
+    JsonNode valueNode = node.get(fieldName);
+    return (valueNode != null && !valueNode.isNull()) ? valueNode.asText() : 
null;
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorCapable.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorCapable.java
new file mode 100644
index 00000000000..a0467ceefcd
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorCapable.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Marker interface for transports that support cursor-based pagination 
operations.
+ * Transports implementing this interface can be used with cursor pagination 
features.
+ */
+public interface CursorCapable {
+
+  /**
+   * Executes a query with cursor support.
+   *
+   * @param brokerAddress The broker address to send the query to
+   * @param query The SQL query to execute
+   * @param numRows The number of rows to return per page
+   * @return CursorAwareBrokerResponse containing results and cursor metadata
+   * @throws PinotClientException If query execution fails
+   */
+  default CursorAwareBrokerResponse executeQueryWithCursor(String 
brokerAddress, String query, int numRows)
+      throws PinotClientException {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Executes a query with cursor support asynchronously.
+   *
+   * @param brokerAddress The broker address to send the query to
+   * @param query The SQL query to execute
+   * @param numRows The number of rows to return per page
+   * @return CompletableFuture containing CursorAwareBrokerResponse with 
results and cursor metadata
+   * @throws PinotClientException If query execution fails
+   */
+  default CompletableFuture<CursorAwareBrokerResponse> 
executeQueryWithCursorAsync(String brokerAddress,
+      String query, int numRows) throws PinotClientException {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Fetches the next page for a cursor with specified offset and number of 
rows.
+   *
+   * @param brokerAddress The broker address to send the request to
+   * @param cursorId The cursor identifier
+   * @param offset The offset for pagination
+   * @param numRows The number of rows to fetch
+   * @return CursorAwareBrokerResponse containing the next page
+   * @throws PinotClientException If the fetch operation fails
+   */
+  default CursorAwareBrokerResponse fetchNextPage(String brokerAddress, String 
cursorId, int offset, int numRows)
+      throws PinotClientException {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Fetches the next page for a cursor asynchronously with specified offset 
and number of rows.
+   *
+   * @param brokerAddress The broker address to send the request to
+   * @param cursorId The cursor identifier
+   * @param offset The offset for pagination
+   * @param numRows The number of rows to fetch
+   * @return CompletableFuture containing the next page
+   */
+  default CompletableFuture<CursorAwareBrokerResponse> 
fetchNextPageAsync(String brokerAddress, String cursorId,
+      int offset, int numRows) {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Fetches the previous page for a cursor with specified offset and number 
of rows.
+   *
+   * @param brokerAddress The broker address to send the request to
+   * @param cursorId The cursor identifier
+   * @param offset The offset for pagination
+   * @param numRows The number of rows to fetch
+   * @return CursorAwareBrokerResponse containing the previous page
+   * @throws PinotClientException If the fetch operation fails
+   */
+  default CursorAwareBrokerResponse fetchPreviousPage(String brokerAddress, 
String cursorId, int offset, int numRows)
+      throws PinotClientException {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Fetches the previous page for a cursor asynchronously with specified 
offset and number of rows.
+   *
+   * @param brokerAddress The broker address to send the request to
+   * @param cursorId The cursor identifier
+   * @param offset The offset for pagination
+   * @param numRows The number of rows to fetch
+   * @return CompletableFuture containing the previous page
+   */
+  default CompletableFuture<CursorAwareBrokerResponse> 
fetchPreviousPageAsync(String brokerAddress, String cursorId,
+      int offset, int numRows) {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Seeks to a specific page for a cursor with specified number of rows.
+   *
+   * @param brokerAddress The broker address to send the request to
+   * @param cursorId The cursor identifier
+   * @param pageNumber The 1-based page number to seek to (will be converted 
to offset internally)
+   * @param numRows The number of rows to fetch
+   * @return CursorAwareBrokerResponse containing the requested page
+   * @throws PinotClientException If the seek operation fails
+   */
+  default CursorAwareBrokerResponse seekToPage(String brokerAddress, String 
cursorId, int pageNumber, int numRows)
+      throws PinotClientException {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Seeks to a specific page for a cursor asynchronously with specified 
number of rows.
+   *
+   * @param brokerAddress The broker address to send the request to
+   * @param cursorId The cursor identifier
+   * @param pageNumber The 1-based page number to seek to (will be converted 
to offset internally)
+   * @param numRows The number of rows to fetch
+   * @return CompletableFuture containing the requested page
+   */
+  default CompletableFuture<CursorAwareBrokerResponse> seekToPageAsync(String 
brokerAddress, String cursorId,
+      int pageNumber, int numRows) {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Retrieves metadata for an existing cursor.
+   *
+   * @param brokerAddress The broker address in "host:port" format (must be 
same as cursor creator)
+   * @param requestId The unique identifier of the cursor
+   * @return BrokerResponse containing cursor metadata
+   * @throws PinotClientException If metadata retrieval fails
+   */
+  default BrokerResponse getCursorMetadata(String brokerAddress, String 
requestId) throws PinotClientException {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Retrieves metadata for an existing cursor asynchronously.
+   *
+   * @param brokerAddress The broker address in "host:port" format (must be 
same as cursor creator)
+   * @param requestId The unique identifier of the cursor
+   * @return CompletableFuture containing BrokerResponse with cursor metadata
+   */
+  default CompletableFuture<BrokerResponse> getCursorMetadataAsync(String 
brokerAddress, String requestId) {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Deletes a cursor and cleans up its resources.
+   *
+   * @param brokerAddress The broker address in "host:port" format (must be 
same as cursor creator)
+   * @param requestId The unique identifier of the cursor to delete
+   * @throws PinotClientException If cursor deletion fails
+   */
+  default void deleteCursor(String brokerAddress, String requestId) throws 
PinotClientException {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+
+  /**
+   * Deletes a cursor and cleans up its resources asynchronously.
+   *
+   * @param brokerAddress The broker address in "host:port" format (must be 
same as cursor creator)
+   * @param requestId The unique identifier of the cursor to delete
+   * @return CompletableFuture that completes when the cursor is deleted
+   */
+  default CompletableFuture<Void> deleteCursorAsync(String brokerAddress, 
String requestId) {
+    throw new UnsupportedOperationException("Cursor operations not supported 
by this transport implementation");
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorResultSetGroup.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorResultSetGroup.java
new file mode 100644
index 00000000000..6ea431d1d44
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorResultSetGroup.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.List;
+
+/**
+ * A cursor-based result set group that delegates result set access to an 
internal ResultSetGroup
+ * and provides cursor metadata access. This is a pure data container without 
navigation logic.
+ *
+ * <p><strong>Thread Safety:</strong> This class is immutable and thread-safe.
+ */
+public class CursorResultSetGroup implements BaseResultSetGroup {
+  private final ResultSetGroup _resultSetGroup;
+  private final CursorAwareBrokerResponse _cursorResponse;
+
+  /**
+   * Creates a cursor result set group from a cursor-aware broker response.
+   *
+   * @param cursorResponse the cursor-aware broker response containing results 
and cursor metadata
+   */
+  public CursorResultSetGroup(CursorAwareBrokerResponse cursorResponse) {
+    _cursorResponse = cursorResponse;
+    _resultSetGroup = new ResultSetGroup(cursorResponse);
+  }
+
+  public int getResultSetCount() {
+    return _resultSetGroup.getResultSetCount();
+  }
+
+  public ResultSet getResultSet(int index) {
+    return _resultSetGroup.getResultSet(index);
+  }
+
+  public String getCursorId() {
+    return _cursorResponse.getRequestId();
+  }
+
+  public int getPageSize() {
+    Integer numRows = _cursorResponse.getNumRows();
+    return numRows != null ? numRows : 0;
+  }
+
+  public long getExpirationTimeMs() {
+    Long expirationTime = _cursorResponse.getExpirationTimeMs();
+    return expirationTime != null ? expirationTime : 0L;
+  }
+
+  public Long getOffset() {
+    return _cursorResponse.getOffset();
+  }
+
+  public Long getNumRowsResultSet() {
+    return _cursorResponse.getNumRowsResultSet();
+  }
+
+  public Long getCursorResultWriteTimeMs() {
+    return _cursorResponse.getCursorResultWriteTimeMs();
+  }
+
+  public Long getSubmissionTimeMs() {
+    return _cursorResponse.getSubmissionTimeMs();
+  }
+
+  public Long getTotalRows() {
+    return _cursorResponse.getNumRowsResultSet();
+  }
+
+  public String getBrokerHost() {
+    return _cursorResponse.getBrokerHost();
+  }
+
+  public Integer getBrokerPort() {
+    return _cursorResponse.getBrokerPort();
+  }
+
+  public Long getBytesWritten() {
+    return _cursorResponse.getBytesWritten();
+  }
+
+  public Long getCursorFetchTimeMs() {
+    return _cursorResponse.getCursorFetchTimeMs();
+  }
+
+  public ExecutionStats getExecutionStats() {
+    return _resultSetGroup.getExecutionStats();
+  }
+
+  public List<PinotClientException> getExceptions() {
+    return _resultSetGroup.getExceptions();
+  }
+
+  public BrokerResponse getBrokerResponse() {
+    return _resultSetGroup.getBrokerResponse();
+  }
+
+  /**
+   * Gets the cursor-aware broker response for accessing cursor metadata.
+   *
+   * @return the cursor-aware broker response
+   */
+  public CursorAwareBrokerResponse getCursorResponse() {
+    return _cursorResponse;
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
index c0191ed3613..0453a3bfdd9 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
 /**
  * JSON encoded Pinot client transport over AsyncHttpClient.
  */
-public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<ClientStats> {
+public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<ClientStats>, CursorCapable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonAsyncHttpPinotClientTransport.class);
   private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;
   private static final String DEFAULT_EXTRA_QUERY_OPTION_STRING = 
"groupByMode=sql;responseFormat=sql";
@@ -152,6 +152,295 @@ public class JsonAsyncHttpPinotClientTransport implements 
PinotClientTransport<C
     }
   }
 
+  /**
+   * Executes a query with cursor pagination support.
+   *
+   * @param brokerAddress The broker address in "host:port" format
+   * @param query The SQL query to execute
+   * @param numRows The number of rows to return in the first page
+   * @return BrokerResponse containing the first page and cursor metadata
+   * @throws PinotClientException If query execution fails
+   */
+  @Override
+  public CursorAwareBrokerResponse executeQueryWithCursor(String 
brokerAddress, String query, int numRows)
+      throws PinotClientException {
+    try {
+      return executeQueryWithCursorAsync(brokerAddress, query, 
numRows).get(_brokerReadTimeout, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+  /**
+   * Executes a query asynchronously with cursor pagination support.
+   *
+   * @param brokerAddress The broker address in "host:port" format
+   * @param query The SQL query to execute
+   * @param numRows The number of rows to return in the first page
+   * @return CompletableFuture containing BrokerResponse with first page and 
cursor metadata
+   */
+  @Override
+  public CompletableFuture<CursorAwareBrokerResponse> 
executeQueryWithCursorAsync(String brokerAddress, String query,
+      int numRows) {
+    try {
+      ObjectNode json = JsonNodeFactory.instance.objectNode();
+      json.put("sql", query);
+      if (_extraOptionStr != null && !_extraOptionStr.isEmpty()) {
+        json.put("queryOptions", _extraOptionStr);
+      }
+
+      LOGGER.debug("Cursor query will use Multistage Engine = {}", 
_useMultistageEngine);
+
+      String url = String.format("%s://%s%s?getCursor=true&numRows=%d", 
_scheme, brokerAddress,
+          _useMultistageEngine ? "/query" : "/query/sql", numRows);
+      BoundRequestBuilder requestBuilder = _httpClient.preparePost(url);
+
+      if (_headers != null) {
+        _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+      }
+      LOGGER.debug("Sending cursor query {} to {}", query, url);
+      return requestBuilder.addHeader("Content-Type", "application/json; 
charset=utf-8").setBody(json.toString())
+          .execute().toCompletableFuture().thenApply(httpResponse -> {
+            LOGGER.debug("Completed cursor query, HTTP status is {}", 
httpResponse.getStatusCode());
+
+            if (httpResponse.getStatusCode() != 200) {
+              throw new PinotClientException(
+                  "Pinot returned HTTP status " + httpResponse.getStatusCode() 
+ ", expected 200");
+            }
+
+            try {
+              return 
CursorAwareBrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+            } catch (IOException e) {
+              throw new CompletionException(e);
+            }
+          });
+    } catch (Exception e) {
+      return CompletableFuture.failedFuture(new PinotClientException(e));
+    }
+  }
+
+
+  /**
+   * Retrieves metadata for an existing cursor.
+   *
+   * @param brokerAddress The broker address in "host:port" format (must be 
same as cursor creator)
+   * @param requestId The unique identifier of the cursor
+   * @return BrokerResponse containing cursor metadata
+   * @throws PinotClientException If metadata retrieval fails
+   */
+  @Override
+  public BrokerResponse getCursorMetadata(String brokerAddress, String 
requestId) throws PinotClientException {
+    try {
+      return getCursorMetadataAsync(brokerAddress, 
requestId).get(_brokerReadTimeout, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+  /**
+   * Retrieves metadata for an existing cursor asynchronously.
+   *
+   * @param brokerAddress The broker address in "host:port" format (must be 
same as cursor creator)
+   * @param requestId The unique identifier of the cursor
+   * @return CompletableFuture containing BrokerResponse with cursor metadata
+   */
+  @Override
+  public CompletableFuture<BrokerResponse> getCursorMetadataAsync(String 
brokerAddress, String requestId) {
+    try {
+      String url = String.format("%s://%s/responseStore/%s/", _scheme, 
brokerAddress, requestId);
+      BoundRequestBuilder requestBuilder = _httpClient.prepareGet(url);
+
+      if (_headers != null) {
+        _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+      }
+      LOGGER.debug("Getting cursor metadata from {}", url);
+      return 
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+        LOGGER.debug("Completed cursor metadata fetch, HTTP status is {}", 
httpResponse.getStatusCode());
+
+        if (httpResponse.getStatusCode() != 200) {
+          throw new PinotClientException(
+              "Pinot returned HTTP status " + httpResponse.getStatusCode() + 
", expected 200");
+        }
+
+        try {
+          return 
BrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+        } catch (IOException e) {
+          throw new CompletionException(e);
+        }
+      });
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+  /**
+   * Deletes a cursor and cleans up its resources.
+   *
+   * @param brokerAddress The broker address in "host:port" format (must be 
same as cursor creator)
+   * @param requestId The unique identifier of the cursor to delete
+   * @throws PinotClientException If cursor deletion fails
+   */
+  @Override
+  public void deleteCursor(String brokerAddress, String requestId) throws 
PinotClientException {
+    try {
+      deleteCursorAsync(brokerAddress, requestId).get(_brokerReadTimeout, 
TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+  /**
+   * Deletes a cursor and cleans up its resources asynchronously.
+   *
+   * @param brokerAddress The broker address in "host:port" format (must be 
same as cursor creator)
+   * @param requestId The unique identifier of the cursor to delete
+   * @return CompletableFuture that completes when the cursor is deleted
+   */
+  @Override
+  public CompletableFuture<Void> deleteCursorAsync(String brokerAddress, 
String requestId) {
+    try {
+      String url = String.format("%s://%s/responseStore/%s/", _scheme, 
brokerAddress, requestId);
+      BoundRequestBuilder requestBuilder = _httpClient.prepareDelete(url);
+
+      if (_headers != null) {
+        _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+      }
+      LOGGER.debug("Deleting cursor at {}", url);
+      return 
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+        LOGGER.debug("Completed cursor deletion, HTTP status is {}", 
httpResponse.getStatusCode());
+
+        if (httpResponse.getStatusCode() != 200) {
+          throw new PinotClientException(
+              "Pinot returned HTTP status " + httpResponse.getStatusCode() + 
", expected 200");
+        }
+        return null;
+      });
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+
+  public CursorAwareBrokerResponse fetchNextPage(String brokerAddress, String 
cursorId, int offset, int numRows)
+      throws PinotClientException {
+    try {
+      return fetchNextPageAsync(brokerAddress, cursorId, offset, 
numRows).get(_brokerReadTimeout,
+          TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<CursorAwareBrokerResponse> 
fetchNextPageAsync(String brokerAddress, String cursorId,
+      int offset, int numRows) {
+    try {
+      String url = 
String.format("%s://%s/responseStore/%s/results?offset=%d&numRows=%d", _scheme, 
brokerAddress,
+          cursorId, offset, numRows);
+      BoundRequestBuilder requestBuilder = _httpClient.prepareGet(url);
+
+      if (_headers != null) {
+        _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+      }
+
+      return 
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+        if (httpResponse.getStatusCode() != 200) {
+          throw new PinotClientException(
+              "Pinot returned HTTP status " + httpResponse.getStatusCode() + 
", expected 200");
+        }
+
+        try {
+          return 
CursorAwareBrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+        } catch (IOException e) {
+          throw new CompletionException(e);
+        }
+      });
+    } catch (Exception e) {
+      return CompletableFuture.failedFuture(new PinotClientException(e));
+    }
+  }
+
+
+  public CursorAwareBrokerResponse fetchPreviousPage(String brokerAddress, 
String cursorId, int offset, int numRows)
+      throws PinotClientException {
+    try {
+      return fetchPreviousPageAsync(brokerAddress, cursorId, offset, 
numRows).get(_brokerReadTimeout,
+          TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<CursorAwareBrokerResponse> 
fetchPreviousPageAsync(String brokerAddress, String cursorId,
+      int offset, int numRows) {
+    try {
+      String url = 
String.format("%s://%s/responseStore/%s/results?offset=%d&numRows=%d", _scheme, 
brokerAddress,
+          cursorId, offset, numRows);
+      BoundRequestBuilder requestBuilder = _httpClient.prepareGet(url);
+
+      if (_headers != null) {
+        _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+      }
+
+      return 
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+        if (httpResponse.getStatusCode() != 200) {
+          throw new PinotClientException(
+              "Pinot returned HTTP status " + httpResponse.getStatusCode() + 
", expected 200");
+        }
+
+        try {
+          return 
CursorAwareBrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+        } catch (IOException e) {
+          throw new CompletionException(e);
+        }
+      });
+    } catch (Exception e) {
+      return CompletableFuture.failedFuture(new PinotClientException(e));
+    }
+  }
+
+
+  public CursorAwareBrokerResponse seekToPage(String brokerAddress, String 
cursorId, int pageNumber, int numRows)
+      throws PinotClientException {
+    try {
+      return seekToPageAsync(brokerAddress, cursorId, pageNumber, 
numRows).get(_brokerReadTimeout,
+          TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      throw new PinotClientException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<CursorAwareBrokerResponse> seekToPageAsync(String 
brokerAddress, String cursorId,
+      int pageNumber, int numRows) {
+    try {
+      int offset = (pageNumber - 1) * numRows;
+      String url = 
String.format("%s://%s/responseStore/%s/results?offset=%d&numRows=%d", _scheme, 
brokerAddress,
+          cursorId, offset, numRows);
+      BoundRequestBuilder requestBuilder = _httpClient.prepareGet(url);
+
+      if (_headers != null) {
+        _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+      }
+
+      return 
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+        if (httpResponse.getStatusCode() != 200) {
+          throw new PinotClientException(
+              "Pinot returned HTTP status " + httpResponse.getStatusCode() + 
", expected 200");
+        }
+
+        try {
+          return 
CursorAwareBrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+        } catch (IOException e) {
+          throw new CompletionException(e);
+        }
+      });
+    } catch (Exception e) {
+      return CompletableFuture.failedFuture(new PinotClientException(e));
+    }
+  }
+
   @Override
   public ClientStats getClientMetrics() {
     return _httpClient.getClientStats();
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursor.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursor.java
new file mode 100644
index 00000000000..e38af328aef
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursor.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A cursor for navigating through paginated query results using the Cursor 
Handle Pattern.
+ *
+ * This interface provides a clean abstraction for cursor-based pagination,
+ * where each page contains a complete CursorResultSetGroup with all result 
sets
+ * and metadata. The cursor starts with the first page already loaded.
+ *
+ * Usage example:
+ * <pre>
+ * try (ResultCursor cursor = connection.openCursor("SELECT * FROM table", 
1000)) {
+ *     // First page is immediately available
+ *     CursorResultSetGroup firstPage = cursor.getCurrentPage();
+ *
+ *     // Navigate through remaining pages
+ *     while (cursor.hasNext()) {
+ *         CursorResultSetGroup page = cursor.next();
+ *         // Process all result sets in the page
+ *         for (int i = 0; i < page.getResultSetCount(); i++) {
+ *             ResultSet rs = page.getResultSet(i);
+ *             // Process result set
+ *         }
+ *     }
+ * }
+ * </pre>
+ */
+public interface ResultCursor extends AutoCloseable {
+
+  /**
+   * Gets the current page of results without navigation.
+   * The first page is available immediately when the cursor is created.
+   *
+   * @return the current page of results
+   */
+  CursorResultSetGroup getCurrentPage();
+
+  /**
+   * Checks if there are more pages available after the current page.
+   *
+   * @return true if more pages are available, false otherwise
+   */
+  boolean hasNext();
+
+  /**
+   * Checks if there are previous pages available before the current page.
+   *
+   * @return true if previous pages are available, false otherwise
+   */
+  boolean hasPrevious();
+
+  /**
+   * Fetches the next page of results and advances the cursor position.
+   *
+   * @return the next page of results
+   * @throws PinotClientException if an error occurs while fetching
+   * @throws IllegalStateException if no next page is available
+   */
+  CursorResultSetGroup next() throws PinotClientException;
+
+  /**
+   * Fetches the previous page of results and moves the cursor position 
backward.
+   *
+   * @return the previous page of results
+   * @throws PinotClientException if an error occurs while fetching
+   * @throws IllegalStateException if no previous page is available
+   */
+  CursorResultSetGroup previous() throws PinotClientException;
+
+  /**
+   * Seeks to a specific page number (1-based) and updates the cursor position.
+   *
+   * @param pageNumber the page number to seek to (1-based)
+   * @return the requested page of results
+   * @throws PinotClientException if an error occurs while seeking
+   * @throws IllegalArgumentException if pageNumber is invalid
+   */
+  CursorResultSetGroup seekToPage(int pageNumber) throws PinotClientException;
+
+  /**
+   * Fetches the next page of results asynchronously.
+   *
+   * @return a CompletableFuture containing the next page of results
+   */
+  CompletableFuture<CursorResultSetGroup> nextAsync();
+
+  /**
+   * Fetches the previous page of results asynchronously.
+   *
+   * @return a CompletableFuture containing the previous page of results
+   */
+  CompletableFuture<CursorResultSetGroup> previousAsync();
+
+  /**
+   * Seeks to a specific page number asynchronously.
+   *
+   * @param pageNumber the page number to seek to (1-based)
+   * @return a CompletableFuture containing the requested page of results
+   */
+  CompletableFuture<CursorResultSetGroup> seekToPageAsync(int pageNumber);
+
+  /**
+   * Gets the current cursor ID.
+   *
+   * @return the cursor ID
+   */
+  String getCursorId();
+
+  /**
+   * Gets the current page number (1-based).
+   *
+   * @return the current page number
+   */
+  int getCurrentPageNumber();
+
+  /**
+   * Gets the total number of rows across all pages, if known.
+   *
+   * @return the total number of rows, or -1 if unknown
+   */
+  long getTotalRows();
+
+  /**
+   * Gets the page size for this cursor.
+   *
+   * @return the page size
+   */
+  int getPageSize();
+
+  /**
+   * Checks if the cursor has expired on the server.
+   *
+   * @return true if the cursor has expired, false otherwise
+   */
+  boolean isExpired();
+
+  /**
+   * Closes the cursor and releases any associated resources.
+   * This sends a cleanup request to the server to free cursor resources.
+   *
+   * @throws PinotClientException if an error occurs while closing
+   */
+  @Override
+  void close() throws PinotClientException;
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursorImpl.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursorImpl.java
new file mode 100644
index 00000000000..c609e489832
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursorImpl.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of ResultCursor that manages cursor-based pagination.
+ *
+ * This class encapsulates the cursor state and navigation logic,
+ * keeping the Connection and CursorResultSetGroup classes focused
+ * on their primary responsibilities. The cursor starts with the first
+ * page already loaded.
+ *
+ * Thread Safety: All navigation methods are synchronized to ensure
+ * safe concurrent access to cursor state.
+ */
+public class ResultCursorImpl implements ResultCursor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResultCursorImpl.class);
+
+  private final PinotClientTransport<?> _transport;
+  private final String _brokerHostPort;
+  private final boolean _failOnExceptions;
+  private final int _pageSize;
+
+  // Synchronization lock for cursor state
+  private final Object _stateLock = new Object();
+
+  // Volatile for visibility across threads
+  private volatile CursorResultSetGroup _currentPage;
+  private volatile int _currentPageNumber;
+  private volatile boolean _closed;
+
+  /**
+   * Creates a new cursor with the initial page of results already loaded.
+   *
+   * @param transport the transport to use for navigation
+   * @param brokerHostPort the broker host and port
+   * @param initialResponse the initial cursor-aware response (first page)
+   * @param failOnExceptions whether to fail on query exceptions
+   */
+  public ResultCursorImpl(PinotClientTransport<?> transport, String 
brokerHostPort,
+                         CursorAwareBrokerResponse initialResponse, boolean 
failOnExceptions) {
+    _transport = transport;
+    _brokerHostPort = brokerHostPort;
+    _failOnExceptions = failOnExceptions;
+    _pageSize = initialResponse.getNumRows() != null
+        ? initialResponse.getNumRows().intValue() : -1;
+    // Initialize state - no synchronization needed in constructor
+    _currentPage = new CursorResultSetGroup(initialResponse);
+    _currentPageNumber = 0;
+    _closed = false;
+  }
+
+  @Override
+  public CursorResultSetGroup getCurrentPage() {
+    synchronized (_stateLock) {
+      checkNotClosed();
+      return _currentPage;
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    synchronized (_stateLock) {
+      checkNotClosed();
+      CursorAwareBrokerResponse response = _currentPage.getCursorResponse();
+      Long offset = response.getOffset();
+      Integer numRows = response.getNumRows();
+      Long totalRows = response.getNumRowsResultSet();
+
+      if (offset == null || numRows == null || totalRows == null) {
+        return false;
+      }
+      return (offset + numRows) < totalRows;
+    }
+  }
+
+  @Override
+  public boolean hasPrevious() {
+    synchronized (_stateLock) {
+      checkNotClosed();
+      CursorAwareBrokerResponse response = _currentPage.getCursorResponse();
+      Long offset = response.getOffset();
+      return offset != null && offset > 0;
+    }
+  }
+
+  @Override
+  public CursorResultSetGroup next() throws PinotClientException {
+    return executeNavigation(
+        () -> {
+          CursorAwareBrokerResponse response = 
_currentPage.getCursorResponse();
+          Long offset = response.getOffset();
+          Integer numRows = response.getNumRows();
+          Long totalRows = response.getNumRowsResultSet();
+          if (offset == null || numRows == null || totalRows == null
+              || (offset + numRows) >= totalRows) {
+            throw new IllegalStateException("No next page available");
+          }
+        },
+        () -> _currentPageNumber * _pageSize,
+        (transport, cursorId, offset) -> 
transport.fetchNextPage(_brokerHostPort, cursorId, offset, _pageSize),
+        () -> _currentPageNumber++,
+        "Failed to fetch next page"
+    );
+  }
+
+  @Override
+  public CursorResultSetGroup previous() throws PinotClientException {
+    return executeNavigation(
+        () -> {
+          CursorAwareBrokerResponse response = 
_currentPage.getCursorResponse();
+          Long offset = response.getOffset();
+          if (offset == null || offset <= 0) {
+            throw new IllegalStateException("No previous page available");
+          }
+        },
+        () -> (_currentPageNumber - 1) * _pageSize,
+        (transport, cursorId, offset) -> 
transport.fetchPreviousPage(_brokerHostPort, cursorId, offset, _pageSize),
+        () -> _currentPageNumber--,
+        "Failed to fetch previous page"
+    );
+  }
+
+  @Override
+  public CursorResultSetGroup seekToPage(int pageNumber) throws 
PinotClientException {
+    return executeNavigation(
+        () -> {
+          if (pageNumber <= 0) {
+            throw new IllegalArgumentException("Page number must be positive 
(1-based)");
+          }
+        },
+        () -> (pageNumber - 1) * _pageSize,
+        (transport, cursorId, offset) -> transport.seekToPage(_brokerHostPort, 
cursorId, pageNumber, _pageSize),
+        () -> _currentPageNumber = pageNumber - 1,
+        "Failed to seek to page " + pageNumber
+    );
+  }
+
+  @Override
+  public CompletableFuture<CursorResultSetGroup> nextAsync() {
+    return executeNavigationAsync(
+        () -> {
+          CursorAwareBrokerResponse response = 
_currentPage.getCursorResponse();
+          Long offset = response.getOffset();
+          Integer numRows = response.getNumRows();
+          Long totalRows = response.getNumRowsResultSet();
+          if (offset == null || numRows == null || totalRows == null
+              || (offset + numRows) >= totalRows) {
+            throw new IllegalStateException("No next page available");
+          }
+        },
+        () -> _currentPageNumber * _pageSize,
+        (transport, cursorId, offset) -> 
transport.fetchNextPageAsync(_brokerHostPort, cursorId, offset, _pageSize),
+        () -> _currentPageNumber++
+    );
+  }
+
+  @Override
+  public CompletableFuture<CursorResultSetGroup> previousAsync() {
+    return executeNavigationAsync(
+        () -> {
+          CursorAwareBrokerResponse response = 
_currentPage.getCursorResponse();
+          Long offset = response.getOffset();
+          if (offset == null || offset <= 0) {
+            throw new IllegalStateException("No previous page available");
+          }
+        },
+        () -> (_currentPageNumber - 1) * _pageSize,
+        (transport, cursorId, offset) -> 
transport.fetchPreviousPageAsync(_brokerHostPort, cursorId, offset, _pageSize),
+        () -> _currentPageNumber--
+    );
+  }
+
+  @Override
+  public CompletableFuture<CursorResultSetGroup> seekToPageAsync(int 
pageNumber) {
+    return executeNavigationAsync(
+        () -> {
+          if (pageNumber <= 0) {
+            throw new IllegalArgumentException("Page number must be positive 
(1-based)");
+          }
+        },
+        () -> (pageNumber - 1) * _pageSize,
+        (transport, cursorId, offset) -> 
transport.seekToPageAsync(_brokerHostPort, cursorId, pageNumber, _pageSize),
+        () -> _currentPageNumber = pageNumber - 1
+    );
+  }
+
+  private CursorResultSetGroup executeNavigation(
+      Runnable validator,
+      IntSupplier offsetCalculator,
+      TransportFunction transportFunction,
+      Runnable stateUpdater,
+      String errorMessage) throws PinotClientException {
+    int offset;
+    String cursorId;
+    synchronized (_stateLock) {
+      checkNotClosed();
+      validator.run();
+      offset = offsetCalculator.getAsInt();
+      cursorId = _currentPage.getCursorId();
+    }
+
+    JsonAsyncHttpPinotClientTransport transport = getValidatedTransport();
+
+    try {
+      CursorAwareBrokerResponse response = transportFunction.apply(transport, 
cursorId, offset);
+      return updateStateAndReturn(response, stateUpdater);
+    } catch (PinotClientException e) {
+      throw new PinotClientException(errorMessage, e);
+    }
+  }
+
+  private CompletableFuture<CursorResultSetGroup> executeNavigationAsync(
+      Runnable validator,
+      IntSupplier offsetCalculator,
+      AsyncTransportFunction transportFunction,
+      Runnable stateUpdater) {
+    try {
+      int offset;
+      String cursorId;
+      synchronized (_stateLock) {
+        checkNotClosed();
+        validator.run();
+        offset = offsetCalculator.getAsInt();
+        cursorId = _currentPage.getCursorId();
+      }
+
+      JsonAsyncHttpPinotClientTransport transport = getValidatedTransport();
+      return transportFunction.apply(transport, cursorId, offset)
+          .thenApply(response -> updateStateAndReturn(response, stateUpdater));
+    } catch (IllegalStateException | IllegalArgumentException | 
UnsupportedOperationException e) {
+      return CompletableFuture.failedFuture(e);
+    }
+  }
+
+  private JsonAsyncHttpPinotClientTransport getValidatedTransport() {
+    if (!(_transport instanceof JsonAsyncHttpPinotClientTransport)) {
+      throw new UnsupportedOperationException("Cursor operations not supported 
by this transport type");
+    }
+    return (JsonAsyncHttpPinotClientTransport) _transport;
+  }
+
+  @FunctionalInterface
+  private interface TransportFunction {
+    CursorAwareBrokerResponse apply(JsonAsyncHttpPinotClientTransport 
transport, String cursorId, int offset)
+        throws PinotClientException;
+  }
+
+  @FunctionalInterface
+  private interface AsyncTransportFunction {
+    CompletableFuture<CursorAwareBrokerResponse> 
apply(JsonAsyncHttpPinotClientTransport transport,
+        String cursorId, int offset);
+  }
+
+  /**
+   * Common method to update cursor state and return result for both sync and 
async operations.
+   * This eliminates code duplication between navigation methods.
+   */
+  private CursorResultSetGroup updateStateAndReturn(CursorAwareBrokerResponse 
response, Runnable stateUpdater) {
+    if (response.hasExceptions() && _failOnExceptions) {
+      throw new PinotClientException("Query had processing exceptions: \n" + 
response.getExceptions());
+    }
+
+    synchronized (_stateLock) {
+      if (_closed) {
+        throw new IllegalStateException("Cursor was closed during operation");
+      }
+      _currentPage = new CursorResultSetGroup(response);
+      stateUpdater.run();
+    }
+    return _currentPage;
+  }
+
+  @Override
+  public String getCursorId() {
+    synchronized (_stateLock) {
+      checkNotClosed();
+      return _currentPage.getCursorId();
+    }
+  }
+
+  @Override
+  public int getCurrentPageNumber() {
+    synchronized (_stateLock) {
+      checkNotClosed();
+      return _currentPageNumber + 1;
+    }
+  }
+
+  @Override
+  public long getTotalRows() {
+    synchronized (_stateLock) {
+      checkNotClosed();
+      Long numRows = _currentPage.getTotalRows();
+      return numRows != null ? numRows : -1;
+    }
+  }
+
+  @Override
+  public int getPageSize() {
+    return _pageSize;
+  }
+
+  @Override
+  public boolean isExpired() {
+    if (_closed) {
+      return true;
+    }
+
+    long expirationTime;
+    synchronized (_stateLock) {
+      if (_closed) {
+        return true;
+      }
+      expirationTime = _currentPage.getExpirationTimeMs();
+    }
+
+    return System.currentTimeMillis() > expirationTime;
+  }
+
+  @Override
+  public void close() throws PinotClientException {
+    if (_closed) {
+      return;
+    }
+
+    // Get cursor ID before marking as closed
+    String cursorId;
+    synchronized (_stateLock) {
+      cursorId = _currentPage.getCursorId();
+      _closed = true;
+    }
+
+    // Server-side cursor cleanup could be implemented in the future
+    // This would involve sending a DELETE request to the broker to clean up 
cursor resources
+    // For now, cursors will expire naturally based on their expiration time
+    LOGGER.debug("Cursor {} closed, server-side cleanup not yet implemented", 
cursorId);
+  }
+
+  private void checkNotClosed() {
+    if (_closed) {
+      throw new IllegalStateException("Cursor is closed");
+    }
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java
index d20765e3416..1c964dd9b3c 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
 /**
  * A Pinot result set group, containing the results given back by Pinot for a 
given query.
  */
-public class ResultSetGroup {
+public class ResultSetGroup implements BaseResultSetGroup {
   private final List<ResultSet> _resultSets;
   private final ExecutionStats _executionStats;
   private final List<PinotClientException> _exceptions;
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
index 18c51b17f11..1240181e651 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pinot.client;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 import org.I0Itec.zkclient.ZkClient;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -120,4 +123,145 @@ public class ConnectionFactoryTest {
     Assert.assertNotNull(connection.getTransport());
     Assert.assertNotNull(connection.getTransport().getClientMetrics());
   }
+
+  @Test
+  public void testConnectionFactoryMethodsPreserved() {
+    // Test that the ConnectionFactory class has the expected methods
+    Method[] methods = ConnectionFactory.class.getDeclaredMethods();
+
+    boolean hasFromZookeeper = false;
+    boolean hasFromController = false;
+    boolean hasFromHostList = false;
+    boolean hasFromProperties = false;
+
+    for (Method method : methods) {
+      if (method.getName().equals("fromZookeeper") && method.getReturnType() 
== Connection.class) {
+        hasFromZookeeper = true;
+      }
+      if (method.getName().equals("fromController") && method.getReturnType() 
== Connection.class) {
+        hasFromController = true;
+      }
+      if (method.getName().equals("fromHostList") && method.getReturnType() == 
Connection.class) {
+        hasFromHostList = true;
+      }
+      if (method.getName().equals("fromProperties") && method.getReturnType() 
== Connection.class) {
+        hasFromProperties = true;
+      }
+    }
+
+    // Verify existing methods are preserved
+    Assert.assertTrue(hasFromZookeeper, "fromZookeeper methods should be 
preserved");
+    Assert.assertTrue(hasFromController, "fromController methods should be 
preserved");
+    Assert.assertTrue(hasFromHostList, "fromHostList methods should be 
preserved");
+    Assert.assertTrue(hasFromProperties, "fromProperties methods should be 
preserved");
+
+    // Verify that Connection has current cursor API (openCursor method)
+    Method[] connectionMethods = Connection.class.getDeclaredMethods();
+    boolean hasOpenCursor = false;
+
+    for (Method method : connectionMethods) {
+      if (method.getName().equals("openCursor")) {
+        hasOpenCursor = true;
+      }
+    }
+
+    Assert.assertTrue(hasOpenCursor, "Connection should have openCursor 
method");
+  }
+
+  @Test
+  public void testConnectionCursorFunctionalityWithJsonTransport() {
+    // Test that connections created with JsonAsyncHttpPinotClientTransport 
support cursor operations
+    List<String> brokers = ImmutableList.of("127.0.0.1:1234");
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    Connection connection = ConnectionFactory.fromHostList(brokers, 
factory.buildTransport());
+
+    // Verify the connection has JsonAsyncHttpPinotClientTransport
+    Assert.assertTrue(connection.getTransport() instanceof 
JsonAsyncHttpPinotClientTransport,
+        "Connection should use JsonAsyncHttpPinotClientTransport for cursor 
support");
+  }
+
+  @Test
+  public void testOpenCursorWithUnsupportedTransport() {
+    // Test that openCursor throws UnsupportedOperationException with 
non-JsonAsyncHttpPinotClientTransport
+    List<String> brokers = ImmutableList.of("127.0.0.1:1234");
+    PinotClientTransport<?> mockTransport = 
Mockito.mock(PinotClientTransport.class);
+    Connection connection = new Connection(brokers, mockTransport);
+
+    try {
+      connection.openCursor("SELECT * FROM testTable", 10);
+      Assert.fail("Expected UnsupportedOperationException");
+    } catch (UnsupportedOperationException e) {
+      Assert.assertEquals("Cursor operations not supported by this connection 
type", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testOpenCursorWithNullBroker() {
+    // Test that openCursor throws PinotClientException when no broker is 
available
+    BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
+    
Mockito.when(mockBrokerSelector.selectBroker(Mockito.any())).thenReturn(null);
+
+    JsonAsyncHttpPinotClientTransport mockTransport = 
Mockito.mock(JsonAsyncHttpPinotClientTransport.class);
+    Connection connection = new Connection(mockBrokerSelector, mockTransport);
+
+    try {
+      connection.openCursor("SELECT * FROM testTable", 10);
+      Assert.fail("Expected PinotClientException");
+    } catch (PinotClientException e) {
+      Assert.assertEquals("Could not find broker to execute cursor query", 
e.getMessage());
+    }
+  }
+
+  @Test
+  public void testOpenCursorWithValidParameters() throws Exception {
+    // Test successful openCursor call with valid parameters
+    BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
+    
Mockito.when(mockBrokerSelector.selectBroker(Mockito.any())).thenReturn("localhost:8099");
+    JsonAsyncHttpPinotClientTransport mockTransport = 
Mockito.mock(JsonAsyncHttpPinotClientTransport.class);
+    CursorAwareBrokerResponse mockResponse = 
Mockito.mock(CursorAwareBrokerResponse.class);
+    Mockito.when(mockResponse.hasExceptions()).thenReturn(false);
+
+    // Mock both sync and async methods since openCursor now uses async 
internally
+    Mockito.when(mockTransport.executeQueryWithCursor(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyInt()))
+        .thenReturn(mockResponse);
+    
Mockito.when(mockTransport.executeQueryWithCursorAsync(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyInt()))
+        .thenReturn(CompletableFuture.completedFuture(mockResponse));
+
+    Connection connection = new Connection(mockBrokerSelector, mockTransport);
+
+    ResultCursor cursor = connection.openCursor("SELECT * FROM testTable", 10);
+    Assert.assertNotNull(cursor, "Cursor should not be null");
+
+    // Verify that table name resolution was used
+    
Mockito.verify(mockBrokerSelector).selectBroker(Mockito.any(String[].class));
+    
Mockito.verify(mockTransport).executeQueryWithCursorAsync("localhost:8099", 
"SELECT * FROM testTable", 10);
+  }
+
+  @Test
+  public void testOpenCursorWithQueryExceptions() throws Exception {
+    // Test openCursor behavior when query has exceptions and failOnExceptions 
is true
+    Properties props = new Properties();
+    props.setProperty(Connection.FAIL_ON_EXCEPTIONS, "true");
+
+    BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
+    
Mockito.when(mockBrokerSelector.selectBroker(Mockito.any())).thenReturn("localhost:8099");
+
+    JsonAsyncHttpPinotClientTransport mockTransport = 
Mockito.mock(JsonAsyncHttpPinotClientTransport.class);
+    CursorAwareBrokerResponse mockResponse = 
Mockito.mock(CursorAwareBrokerResponse.class);
+    Mockito.when(mockResponse.hasExceptions()).thenReturn(true);
+    JsonNode mockExceptions = Mockito.mock(JsonNode.class);
+    Mockito.when(mockResponse.getExceptions()).thenReturn(mockExceptions);
+    Mockito.when(mockTransport.executeQueryWithCursor(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyInt()))
+        .thenReturn(mockResponse);
+
+    Connection connection = new Connection(props, mockBrokerSelector, 
mockTransport);
+
+    try {
+      connection.openCursor("SELECT * FROM invalidTable", 10);
+      Assert.fail("Expected PinotClientException due to query exceptions");
+    } catch (PinotClientException e) {
+      Assert.assertTrue(e.getMessage().contains("Failed to open cursor"),
+          "Expected exception message to contain 'Failed to open cursor', but 
was: " + e.getMessage());
+    }
+  }
 }
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorAwareBrokerResponseTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorAwareBrokerResponseTest.java
new file mode 100644
index 00000000000..61c385de257
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorAwareBrokerResponseTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class CursorAwareBrokerResponseTest {
+
+  @Test
+  public void testCursorAwareBrokerResponseWithAllFields() throws Exception {
+    ObjectMapper mapper = new ObjectMapper();
+    String jsonResponse = "{"
+        + "\"requestId\": \"test-request-123\","
+        + "\"brokerId\": \"broker-1\","
+        + "\"offset\": 100,"
+        + "\"numRows\": 50,"
+        + "\"numRowsResultSet\": 1000,"
+        + "\"cursorResultWriteTimeMs\": 1234567890,"
+        + "\"submissionTimeMs\": 1234567800,"
+        + "\"expirationTimeMs\": 1234567900,"
+        + "\"brokerHost\": \"localhost\","
+        + "\"brokerPort\": 8099,"
+        + "\"bytesWritten\": 2048,"
+        + "\"cursorFetchTimeMs\": 150,"
+        + "\"resultTable\": {"
+        + "  \"dataSchema\": {"
+        + "    \"columnNames\": [\"col1\", \"col2\"],"
+        + "    \"columnDataTypes\": [\"STRING\", \"INT\"]"
+        + "  },"
+        + "  \"rows\": [[\"value1\", 123], [\"value2\", 456]]"
+        + "}"
+        + "}";
+
+    JsonNode jsonNode = mapper.readTree(jsonResponse);
+    CursorAwareBrokerResponse response = 
CursorAwareBrokerResponse.fromJson(jsonNode);
+
+    // Test cursor-specific fields
+    Assert.assertEquals(response.getOffset(), Long.valueOf(100));
+    Assert.assertEquals(response.getNumRows(), Integer.valueOf(50));
+    Assert.assertEquals(response.getNumRowsResultSet(), Long.valueOf(1000));
+    Assert.assertEquals(response.getCursorResultWriteTimeMs(), 
Long.valueOf(1234567890));
+    Assert.assertEquals(response.getSubmissionTimeMs(), 
Long.valueOf(1234567800));
+    Assert.assertEquals(response.getExpirationTimeMs(), 
Long.valueOf(1234567900));
+    Assert.assertEquals(response.getBrokerHost(), "localhost");
+    Assert.assertEquals(response.getBrokerPort(), Integer.valueOf(8099));
+    Assert.assertEquals(response.getBytesWritten(), Long.valueOf(2048));
+    Assert.assertEquals(response.getCursorFetchTimeMs(), Long.valueOf(150));
+
+    // Test inherited BrokerResponse fields
+    Assert.assertEquals(response.getRequestId(), "test-request-123");
+    Assert.assertEquals(response.getBrokerId(), "broker-1");
+    Assert.assertNotNull(response.getResultTable());
+  }
+
+  @Test
+  public void testCursorAwareBrokerResponseWithNullFields() throws Exception {
+    ObjectMapper mapper = new ObjectMapper();
+    String jsonResponse = "{"
+        + "\"requestId\": \"test-request-456\","
+        + "\"offset\": null,"
+        + "\"numRows\": null,"
+        + "\"numRowsResultSet\": null,"
+        + "\"brokerHost\": null"
+        + "}";
+
+    JsonNode jsonNode = mapper.readTree(jsonResponse);
+    CursorAwareBrokerResponse response = 
CursorAwareBrokerResponse.fromJson(jsonNode);
+
+    // Test null fields are handled correctly
+    Assert.assertNull(response.getOffset());
+    Assert.assertNull(response.getNumRows());
+    Assert.assertNull(response.getNumRowsResultSet());
+    Assert.assertNull(response.getBrokerHost());
+    Assert.assertNull(response.getBrokerPort());
+    Assert.assertNull(response.getCursorResultWriteTimeMs());
+    Assert.assertNull(response.getSubmissionTimeMs());
+    Assert.assertNull(response.getExpirationTimeMs());
+    Assert.assertNull(response.getBytesWritten());
+    Assert.assertNull(response.getCursorFetchTimeMs());
+  }
+
+  @Test
+  public void testCursorAwareBrokerResponseWithNullInput() {
+    CursorAwareBrokerResponse response = 
CursorAwareBrokerResponse.fromJson(null);
+
+    // All cursor fields should be null
+    Assert.assertNull(response.getOffset());
+    Assert.assertNull(response.getNumRows());
+    Assert.assertNull(response.getNumRowsResultSet());
+    Assert.assertNull(response.getCursorResultWriteTimeMs());
+    Assert.assertNull(response.getExpirationTimeMs());
+    Assert.assertNull(response.getSubmissionTimeMs());
+    Assert.assertNull(response.getBrokerHost());
+    Assert.assertNull(response.getBrokerPort());
+    Assert.assertNull(response.getBytesWritten());
+    Assert.assertNull(response.getCursorFetchTimeMs());
+  }
+
+  @Test
+  public void testCursorAwareBrokerResponseWithMissingFields() throws 
Exception {
+    ObjectMapper mapper = new ObjectMapper();
+    String jsonResponse = "{"
+        + "\"requestId\": \"test-request-789\","
+        + "\"brokerId\": \"broker-2\""
+        + "}";
+
+    JsonNode jsonNode = mapper.readTree(jsonResponse);
+    CursorAwareBrokerResponse response = 
CursorAwareBrokerResponse.fromJson(jsonNode);
+
+    // Test missing fields are handled as null
+    Assert.assertNull(response.getOffset());
+    Assert.assertNull(response.getNumRows());
+    Assert.assertNull(response.getNumRowsResultSet());
+    Assert.assertNull(response.getBrokerHost());
+    Assert.assertNull(response.getBrokerPort());
+
+    // Test inherited fields work
+    Assert.assertEquals(response.getRequestId(), "test-request-789");
+    Assert.assertEquals(response.getBrokerId(), "broker-2");
+  }
+
+  @Test
+  public void testCursorAwareBrokerResponseWithZeroValues() throws Exception {
+    ObjectMapper mapper = new ObjectMapper();
+    String jsonResponse = "{"
+        + "\"requestId\": \"test-request-000\","
+        + "\"offset\": 0,"
+        + "\"numRows\": 0,"
+        + "\"numRowsResultSet\": 0,"
+        + "\"brokerPort\": 0,"
+        + "\"bytesWritten\": 0"
+        + "}";
+
+    JsonNode jsonNode = mapper.readTree(jsonResponse);
+    CursorAwareBrokerResponse response = 
CursorAwareBrokerResponse.fromJson(jsonNode);
+
+    // Test zero values are preserved
+    Assert.assertEquals(response.getOffset(), Long.valueOf(0));
+    Assert.assertEquals(response.getNumRows(), Integer.valueOf(0));
+    Assert.assertEquals(response.getNumRowsResultSet(), Long.valueOf(0));
+    Assert.assertEquals(response.getBrokerPort(), Integer.valueOf(0));
+    Assert.assertEquals(response.getBytesWritten(), Long.valueOf(0));
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorResultSetGroupTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorResultSetGroupTest.java
new file mode 100644
index 00000000000..3db59ab5636
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorResultSetGroupTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for CursorResultSetGroup class.
+ */
+public class CursorResultSetGroupTest {
+
+  @Test
+  public void testConstructorWithValidParameters() {
+    // Create a mock CursorAwareBrokerResponse
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode mockResponse = mapper.createObjectNode()
+        .put("offset", 0)
+        .put("numRows", 10)
+        .put("numRowsResultSet", 100);
+
+    CursorAwareBrokerResponse cursorResponse = 
CursorAwareBrokerResponse.fromJson(mockResponse);
+
+    CursorResultSetGroup cursorResultSetGroup = new 
CursorResultSetGroup(cursorResponse);
+    Assert.assertNotNull(cursorResultSetGroup);
+  }
+
+  @Test
+  public void testGetCursorFields() {
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode mockResponse = mapper.createObjectNode()
+        .put("offset", 5)
+        .put("numRows", 15)
+        .put("numRowsResultSet", 200);
+
+    CursorAwareBrokerResponse cursorResponse = 
CursorAwareBrokerResponse.fromJson(mockResponse);
+
+    CursorResultSetGroup cursorResultSetGroup = new 
CursorResultSetGroup(cursorResponse);
+    Assert.assertEquals(cursorResultSetGroup.getOffset(), Long.valueOf(5));
+    Assert.assertEquals(cursorResultSetGroup.getPageSize(), 15);
+    Assert.assertEquals(cursorResultSetGroup.getNumRowsResultSet(), 
Long.valueOf(200));
+  }
+
+
+  @Test
+  public void testResultSetDelegation() {
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode mockResponse = mapper.createObjectNode();
+
+    CursorAwareBrokerResponse cursorResponse = 
CursorAwareBrokerResponse.fromJson(mockResponse);
+
+    CursorResultSetGroup cursorResultSetGroup = new 
CursorResultSetGroup(cursorResponse);
+    Assert.assertEquals(cursorResultSetGroup.getResultSetCount(), 0);
+  }
+
+
+  @Test
+  public void testMetadataExtraction() {
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode mockResponse = mapper.createObjectNode()
+        .put("offset", 10)
+        .put("numRows", 20)
+        .put("numRowsResultSet", 500);
+
+    CursorAwareBrokerResponse cursorResponse = 
CursorAwareBrokerResponse.fromJson(mockResponse);
+
+    CursorResultSetGroup cursorResultSetGroup = new 
CursorResultSetGroup(cursorResponse);
+    Assert.assertEquals(cursorResultSetGroup.getOffset(), Long.valueOf(10));
+    Assert.assertEquals(cursorResultSetGroup.getPageSize(), 20);
+    Assert.assertEquals(cursorResultSetGroup.getNumRowsResultSet(), 
Long.valueOf(500));
+  }
+  }
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportTest.java
index 4f2b2cbf816..a211cdf8b76 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportTest.java
@@ -21,11 +21,14 @@ package org.apache.pinot.client;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
 import com.sun.net.httpserver.HttpServer;
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.net.InetSocketAddress;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -34,6 +37,8 @@ import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 
@@ -43,9 +48,25 @@ public class JsonAsyncHttpPinotClientTransportTest 
implements HttpHandler {
       + 
"\"aggregationResults\":[{\"function\":\"count_star\",\"value\":\"36542\"}],\"timeUsedMs\":30,"
       + 
"\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":115545,\"numServersResponded\":99}";
 
+  private static final String _CURSOR_RESPONSE_JSON = 
"{\"requestId\":\"cursor-123\",\"traceInfo\":{},"
+      + "\"resultTable\":{\"dataSchema\":{\"columnNames\":[\"col1\",\"col2\"],"
+      + "\"columnDataTypes\":[\"STRING\",\"INT\"]},"
+      + "\"rows\":[[\"value1\",123],[\"value2\",456]],"
+      + 
"\"metadata\":{\"currentPage\":0,\"pageSize\":100,\"totalRows\":1000,\"totalPages\":10,"
+      + "\"hasNext\":true,\"hasPrevious\":false,\"expirationTimeMs\":" + 
(System.currentTimeMillis() + 300000) + "}},"
+      + 
"\"timeUsedMs\":25,\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":1000,\"numServersResponded\":1}";
+
+  private static final String _CURSOR_METADATA_JSON = 
"{\"requestId\":\"cursor-123\",\"traceInfo\":{},"
+      + 
"\"resultTable\":{\"metadata\":{\"currentPage\":0,\"pageSize\":100,\"totalRows\":1000,\"totalPages\":10,"
+      + "\"hasNext\":true,\"hasPrevious\":false,\"expirationTimeMs\":" + 
(System.currentTimeMillis() + 300000) + "}},"
+      + 
"\"timeUsedMs\":5,\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":1000,\"numServersResponded\":1}";
+
   private HttpServer _dummyServer;
   private String _responseJson = _VALID_RESPONSE_JSON;
   private long _responseDelayMs = 0;
+  private String _requestPath = "";
+  private String _requestMethod = "";
+  private String _requestBody = "";
 
   @BeforeClass
   public void setUp()
@@ -60,6 +81,9 @@ public class JsonAsyncHttpPinotClientTransportTest implements 
HttpHandler {
   public void setUpTestCase() {
     _responseJson = _VALID_RESPONSE_JSON;
     _responseDelayMs = 0L;
+    _requestPath = "";
+    _requestMethod = "";
+    _requestBody = "";
   }
 
   @AfterClass
@@ -116,9 +140,164 @@ public class JsonAsyncHttpPinotClientTransportTest 
implements HttpHandler {
     }
   }
 
+  // Cursor-related tests
+  @Test
+  public void testExecuteQueryWithCursor() {
+    _responseJson = _CURSOR_RESPONSE_JSON;
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+    BrokerResponse response = transport.executeQueryWithCursor("localhost:" + 
_dummyServer.getAddress().getPort(),
+        "select * from planets", 100);
+
+    assertFalse(response.hasExceptions());
+    assertEquals(response.getRequestId(), "cursor-123");
+    assertNotNull(response.getResultTable());
+    assertTrue(_requestPath.contains("getCursor=true"));
+    assertTrue(_requestPath.contains("numRows=100"));
+    assertTrue(_requestBody.contains("\"sql\":\"select * from planets\""));
+  }
+
+  @Test
+  public void testFetchCursorResults()
+      throws Exception {
+    _responseJson = _CURSOR_RESPONSE_JSON;
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+    CursorAwareBrokerResponse response = transport.fetchNextPage("localhost:"
+        + _dummyServer.getAddress().getPort(), "cursor-123", 0, 10);
+    assertNotNull(response);
+    assertTrue(_requestPath.contains("/responseStore/cursor-123/results"));
+    assertTrue(_requestPath.contains("offset=0"));
+    assertTrue(_requestPath.contains("numRows=10"));
+    assertEquals(_requestMethod, "GET");
+  }
+
+  @Test
+  public void testExecuteQueryWithCursorAsync() throws Exception {
+    _responseJson = _CURSOR_RESPONSE_JSON;
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+    CompletableFuture<CursorAwareBrokerResponse> future = 
transport.executeQueryWithCursorAsync(
+        "localhost:" + _dummyServer.getAddress().getPort(), "select * from 
planets", 50);
+
+    CursorAwareBrokerResponse response = future.get();
+    assertFalse(response.hasExceptions());
+    assertEquals(response.getRequestId(), "cursor-123");
+    assertNotNull(response.getResultTable());
+    assertTrue(_requestPath.contains("getCursor=true"));
+    assertTrue(_requestPath.contains("numRows=50"));
+    assertTrue(_requestBody.contains("\"sql\":\"select * from planets\""));
+  }
+
+  @Test
+  public void testFetchNextPageWithOffsetAndNumRows() {
+    _responseJson = _CURSOR_RESPONSE_JSON;
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+
+    // Use the CursorCapable interface method instead of the removed legacy 
method
+    CursorAwareBrokerResponse response = transport.fetchNextPage("localhost:" 
+ _dummyServer.getAddress().getPort(),
+        "cursor-123", 100, 50);
+
+    assertFalse(response.hasExceptions());
+    assertEquals(response.getRequestId(), "cursor-123");
+    assertTrue(_requestPath.contains("/responseStore/cursor-123/results"));
+    assertTrue(_requestPath.contains("offset=100"));
+    assertTrue(_requestPath.contains("numRows=50"));
+    assertEquals(_requestMethod, "GET");
+  }
+
+  @Test
+  public void testFetchCursorResultsAsync() throws Exception {
+    _responseJson = _CURSOR_RESPONSE_JSON;
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+    CompletableFuture<CursorAwareBrokerResponse> future = 
transport.fetchNextPageAsync(
+        "localhost:" + _dummyServer.getAddress().getPort(), "cursor-123", 0, 
10);
+
+    CursorAwareBrokerResponse response = future.get();
+    assertFalse(response.hasExceptions());
+    assertEquals(response.getRequestId(), "cursor-123");
+    assertTrue(_requestPath.contains("/responseStore/cursor-123/results"));
+    assertEquals(_requestMethod, "GET");
+  }
+
+  @Test
+  public void testGetCursorMetadata() {
+    _responseJson = _CURSOR_METADATA_JSON;
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+    BrokerResponse response = transport.getCursorMetadata("localhost:" + 
_dummyServer.getAddress().getPort(),
+        "cursor-123");
+
+    assertFalse(response.hasExceptions());
+    assertEquals(response.getRequestId(), "cursor-123");
+    assertTrue(_requestPath.contains("/responseStore/cursor-123/"));
+    assertEquals(_requestMethod, "GET");
+  }
+
+  @Test
+  public void testGetCursorMetadataAsync() throws Exception {
+    _responseJson = _CURSOR_METADATA_JSON;
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+    CompletableFuture<BrokerResponse> future = 
transport.getCursorMetadataAsync(
+        "localhost:" + _dummyServer.getAddress().getPort(), "cursor-123");
+
+    BrokerResponse response = future.get();
+    assertFalse(response.hasExceptions());
+    assertEquals(response.getRequestId(), "cursor-123");
+    assertTrue(_requestPath.contains("/responseStore/cursor-123/"));
+    assertEquals(_requestMethod, "GET");
+  }
+
+  @Test
+  public void testDeleteCursor() {
+    _responseJson = "{}"; // Empty response for delete
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+
+    // Should not throw exception
+    transport.deleteCursor("localhost:" + _dummyServer.getAddress().getPort(), 
"cursor-123");
+
+    assertTrue(_requestPath.contains("/responseStore/cursor-123/"));
+    assertEquals(_requestMethod, "DELETE");
+  }
+
+  @Test
+  public void testDeleteCursorAsync() throws Exception {
+    _responseJson = "{}"; // Empty response for delete
+    JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+    JsonAsyncHttpPinotClientTransport transport = 
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+    CompletableFuture<Void> future = transport.deleteCursorAsync(
+        "localhost:" + _dummyServer.getAddress().getPort(), "cursor-123");
+
+    future.get(); // Should complete without exception
+    assertTrue(_requestPath.contains("/responseStore/cursor-123/"));
+    assertEquals(_requestMethod, "DELETE");
+  }
+
   @Override
   public void handle(HttpExchange exchange)
       throws IOException {
+    // Capture request details for verification
+    _requestPath = exchange.getRequestURI().toString();
+    _requestMethod = exchange.getRequestMethod();
+
+    // Capture request body for POST requests
+    if ("POST".equals(_requestMethod)) {
+      BufferedReader reader = new BufferedReader(new 
InputStreamReader(exchange.getRequestBody()));
+      StringBuilder requestBody = new StringBuilder();
+      String line;
+      while ((line = reader.readLine()) != null) {
+        requestBody.append(line);
+      }
+      _requestBody = requestBody.toString();
+      reader.close();
+    } else {
+      _requestBody = "";
+    }
+
     if (_responseDelayMs > 0) {
       try {
         Thread.sleep(_responseDelayMs);
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultCursorTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultCursorTest.java
new file mode 100644
index 00000000000..6e3312155a5
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultCursorTest.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ResultCursorTest {
+
+  private static class MockCursorTransport extends 
JsonAsyncHttpPinotClientTransport {
+    private int _currentPage = 1;
+    private final int _totalPages = 3;
+    private final int _pageSize = 10;
+
+    public MockCursorTransport() {
+      super();
+    }
+
+    @Override
+    public CursorAwareBrokerResponse executeQueryWithCursor(String 
brokerHostPort, String query, int numRows) {
+      _currentPage = 1;
+      return createMockResponse(1, true, false);
+    }
+
+    @Override
+    public CursorAwareBrokerResponse fetchNextPage(String brokerHostPort, 
String cursorId, int offset, int numRows) {
+      _currentPage++;
+      return createMockResponse(_currentPage, _currentPage < _totalPages, 
_currentPage > 1);
+    }
+
+    @Override
+    public CompletableFuture<CursorAwareBrokerResponse> 
fetchNextPageAsync(String brokerHostPort, String cursorId,
+        int offset, int numRows) {
+      return CompletableFuture.completedFuture(fetchNextPage(brokerHostPort, 
cursorId, offset, numRows));
+    }
+
+    @Override
+    public CursorAwareBrokerResponse fetchPreviousPage(String brokerHostPort, 
String cursorId, int offset,
+        int numRows) {
+      _currentPage--;
+      return createMockResponse(_currentPage, _currentPage < _totalPages, 
_currentPage > 1);
+    }
+
+    @Override
+    public CompletableFuture<CursorAwareBrokerResponse> 
fetchPreviousPageAsync(String brokerHostPort, String cursorId,
+        int offset, int numRows) {
+      return 
CompletableFuture.completedFuture(fetchPreviousPage(brokerHostPort, cursorId, 
offset, numRows));
+    }
+
+    @Override
+    public CursorAwareBrokerResponse seekToPage(String brokerHostPort, String 
cursorId, int pageNumber, int numRows) {
+      // pageNumber is 1-based, store as current page number (1-based)
+      _currentPage = pageNumber;
+      return createMockResponse(_currentPage, _currentPage < _totalPages, 
_currentPage > 1);
+    }
+
+    @Override
+    public CompletableFuture<CursorAwareBrokerResponse> seekToPageAsync(String 
brokerHostPort, String cursorId,
+        int pageNumber, int numRows) {
+      // pageNumber is 1-based, store as current page number (1-based)
+      _currentPage = pageNumber;
+      return CompletableFuture.completedFuture(
+          createMockResponse(_currentPage, _currentPage < _totalPages, 
_currentPage > 1));
+    }
+
+    private CursorAwareBrokerResponse createMockResponse(int pageNum, boolean 
hasNext, boolean hasPrevious) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        // Calculate total rows across all pages
+        int totalRows = _totalPages * _pageSize;
+        // Calculate offset for this page (0-based offset)
+        int offset = (pageNum - 1) * _pageSize;
+        String jsonResponse = String.format(
+            "{"
+            + "\"requestId\": \"cursor-test-%d\","
+            + "\"numRowsResultSet\": %d,"
+            + "\"numRows\": %d,"
+            + "\"offset\": %d,"
+            + "\"expirationTimeMs\": %d,"
+            + "\"resultTable\": {"
+            + "  \"dataSchema\": {"
+            + "    \"columnNames\": [\"col1\", \"col2\"],"
+            + "    \"columnDataTypes\": [\"STRING\", \"INT\"]"
+            + "  },"
+            + "  \"rows\": ["
+            + "    [\"value%d_1\", %d],"
+            + "    [\"value%d_2\", %d]"
+            + "  ]"
+            + "}"
+            + "}",
+            pageNum, totalRows, _pageSize, offset,
+            System.currentTimeMillis() + 300000,
+            pageNum, pageNum * 10,
+            pageNum, pageNum * 10 + 1
+        );
+
+        JsonNode jsonNode = mapper.readTree(jsonResponse);
+        return CursorAwareBrokerResponse.fromJson(jsonNode);
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to create mock response", e);
+      }
+    }
+  }
+
+  @Test
+  public void testCursorCreation() throws Exception {
+    MockCursorTransport transport = new MockCursorTransport();
+    CursorAwareBrokerResponse initialResponse = 
transport.executeQueryWithCursor("localhost:8000",
+        "SELECT * FROM test", 10);
+
+    try (ResultCursor cursor = new ResultCursorImpl(transport, 
"localhost:8000", initialResponse, false)) {
+      Assert.assertNotNull(cursor.getCurrentPage());
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 1);
+      Assert.assertEquals(cursor.getPageSize(), 10);
+      Assert.assertTrue(cursor.hasNext());
+      Assert.assertFalse(cursor.hasPrevious());
+      Assert.assertNotNull(cursor.getCursorId());
+      Assert.assertFalse(cursor.isExpired());
+    }
+  }
+
+  @Test
+  public void testCursorNavigation() throws Exception {
+    MockCursorTransport transport = new MockCursorTransport();
+    CursorAwareBrokerResponse initialResponse = 
transport.executeQueryWithCursor("localhost:8000",
+        "SELECT * FROM test", 10);
+
+    try (ResultCursor cursor = new ResultCursorImpl(transport, 
"localhost:8000", initialResponse,
+        false)) {
+      // Test next navigation
+      Assert.assertTrue(cursor.hasNext());
+      CursorResultSetGroup page1 = cursor.next();
+      Assert.assertNotNull(page1);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+      Assert.assertTrue(cursor.hasNext());
+      Assert.assertTrue(cursor.hasPrevious());
+
+      // Test next again
+      CursorResultSetGroup page2 = cursor.next();
+      Assert.assertNotNull(page2);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 3);
+      Assert.assertFalse(cursor.hasNext());
+      Assert.assertTrue(cursor.hasPrevious());
+
+      // Test previous navigation
+      CursorResultSetGroup page2Again = cursor.previous();
+      Assert.assertNotNull(page2Again);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+      Assert.assertTrue(cursor.hasNext());
+      Assert.assertTrue(cursor.hasPrevious());
+
+      // Test previous to first page
+      CursorResultSetGroup page1Back = cursor.previous();
+      Assert.assertNotNull(page1Back);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 1);
+      Assert.assertTrue(cursor.hasNext());
+      Assert.assertFalse(cursor.hasPrevious());
+    }
+  }
+
+  @Test
+  public void testSeekToPage() throws Exception {
+    MockCursorTransport transport = new MockCursorTransport();
+    CursorAwareBrokerResponse initialResponse = 
transport.executeQueryWithCursor("localhost:8000",
+        "SELECT * FROM test", 10);
+
+    try (ResultCursor cursor = new ResultCursorImpl(transport, 
"localhost:8000", initialResponse, false)) {
+      // Seek to page 2
+      CursorResultSetGroup page2 = cursor.seekToPage(2);
+      Assert.assertNotNull(page2);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+
+
+      Assert.assertTrue(cursor.hasNext());
+      Assert.assertTrue(cursor.hasPrevious());
+
+      // Seek back to page 1
+      CursorResultSetGroup page1 = cursor.seekToPage(1);
+      Assert.assertNotNull(page1);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 1);
+      Assert.assertTrue(cursor.hasNext());
+      Assert.assertFalse(cursor.hasPrevious());
+    }
+  }
+
+  @Test
+  public void testAsyncNavigation() throws Exception {
+    MockCursorTransport transport = new MockCursorTransport();
+    CursorAwareBrokerResponse initialResponse = 
transport.executeQueryWithCursor("localhost:8000",
+        "SELECT * FROM test", 10);
+
+    try (ResultCursor cursor = new ResultCursorImpl(transport, 
"localhost:8000", initialResponse, false)) {
+      // Test async next
+      CompletableFuture<CursorResultSetGroup> nextFuture = cursor.nextAsync();
+      CursorResultSetGroup page2 = nextFuture.get();
+      Assert.assertNotNull(page2);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+
+      // Test async previous
+      CompletableFuture<CursorResultSetGroup> prevFuture = 
cursor.previousAsync();
+      CursorResultSetGroup page1 = prevFuture.get();
+      Assert.assertNotNull(page1);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 1);
+
+      // Test async seek
+      CompletableFuture<CursorResultSetGroup> seekFuture = 
cursor.seekToPageAsync(2);
+      CursorResultSetGroup page2Seek = seekFuture.get();
+      Assert.assertNotNull(page2Seek);
+      Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+    }
+  }
+
+  @Test
+  public void testNavigationBoundaryConditions() throws Exception {
+    MockCursorTransport transport = new MockCursorTransport();
+    CursorAwareBrokerResponse initialResponse = 
transport.executeQueryWithCursor("localhost:8000",
+        "SELECT * FROM test", 10);
+
+    try (ResultCursor cursor = new ResultCursorImpl(transport, 
"localhost:8000", initialResponse, false)) {
+      // Try to go previous when at first page
+      Assert.assertFalse(cursor.hasPrevious());
+      try {
+        cursor.previous();
+        Assert.fail("Expected IllegalStateException");
+      } catch (IllegalStateException e) {
+        Assert.assertEquals(e.getMessage(), "No previous page available");
+      }
+
+      // Navigate to last page
+      cursor.next();
+      cursor.next();
+      Assert.assertFalse(cursor.hasNext());
+
+      // Try to go next when at last page
+      try {
+        cursor.next();
+        Assert.fail("Expected IllegalStateException");
+      } catch (IllegalStateException e) {
+        Assert.assertEquals(e.getMessage(), "No next page available");
+      }
+    }
+  }
+
+  @Test
+  public void testClosedCursorOperations() throws Exception {
+    MockCursorTransport transport = new MockCursorTransport();
+    CursorAwareBrokerResponse initialResponse = 
transport.executeQueryWithCursor("localhost:8000",
+        "SELECT * FROM test", 10);
+
+    ResultCursor cursor = new ResultCursorImpl(transport, "localhost:8000", 
initialResponse, false);
+    cursor.close();
+    // Test that all operations throw IllegalStateException after close
+    try {
+      cursor.getCurrentPage();
+      Assert.fail("Expected IllegalStateException");
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "Cursor is closed");
+    }
+
+    try {
+      cursor.hasNext();
+      Assert.fail("Expected IllegalStateException");
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "Cursor is closed");
+    }
+
+    try {
+      cursor.next();
+      Assert.fail("Expected IllegalStateException");
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(), "Cursor is closed");
+    }
+  }
+
+  @Test
+  public void testInvalidSeekPageNumber() throws Exception {
+    MockCursorTransport transport = new MockCursorTransport();
+    CursorAwareBrokerResponse initialResponse = 
transport.executeQueryWithCursor("localhost:8000",
+        "SELECT * FROM test", 10);
+
+    try (ResultCursor cursor = new ResultCursorImpl(transport, 
"localhost:8000", initialResponse, false)) {
+      try {
+        cursor.seekToPage(-1);
+        Assert.fail("Expected IllegalArgumentException");
+      } catch (IllegalArgumentException e) {
+        Assert.assertEquals(e.getMessage(), "Page number must be positive 
(1-based)");
+      }
+    }
+  }
+
+  @Test
+  public void testAsyncNavigationBoundaryConditions() throws Exception {
+    MockCursorTransport transport = new MockCursorTransport();
+    CursorAwareBrokerResponse initialResponse = 
transport.executeQueryWithCursor("localhost:8000",
+        "SELECT * FROM test", 10);
+
+    try (ResultCursor cursor = new ResultCursorImpl(transport, 
"localhost:8000", initialResponse, false)) {
+      // Try async previous when at first page
+      CompletableFuture<CursorResultSetGroup> prevFuture = 
cursor.previousAsync();
+      try {
+        prevFuture.get();
+        Assert.fail("Expected ExecutionException");
+      } catch (ExecutionException e) {
+        Assert.assertTrue(e.getCause() instanceof IllegalStateException);
+        Assert.assertEquals(e.getCause().getMessage(), "No previous page 
available");
+      }
+
+      // Navigate to last page
+      cursor.next();
+      cursor.next();
+
+      // Try async next when at last page
+      CompletableFuture<CursorResultSetGroup> nextFuture = cursor.nextAsync();
+      try {
+        nextFuture.get();
+        Assert.fail("Expected ExecutionException");
+      } catch (ExecutionException e) {
+        Assert.assertTrue(e.getCause() instanceof IllegalStateException);
+        Assert.assertEquals(e.getCause().getMessage(), "No next page 
available");
+      }
+
+      // Try async seek with invalid page number
+      CompletableFuture<CursorResultSetGroup> seekFuture = 
cursor.seekToPageAsync(-1);
+      try {
+        seekFuture.get();
+        Assert.fail("Expected ExecutionException");
+      } catch (ExecutionException e) {
+        Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+        Assert.assertEquals(e.getCause().getMessage(), "Page number must be 
positive (1-based)");
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to