This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9bac2500278 issue-16707: Add cursor based pagination support to java
client (#16782)
9bac2500278 is described below
commit 9bac2500278c0e903f750a6b34c1e396d122985d
Author: John Solomon J <[email protected]>
AuthorDate: Wed Sep 17 19:02:33 2025 -0700
issue-16707: Add cursor based pagination support to java client (#16782)
* issue-16707 Add cursor based pagination support to java client
* issue-16707 Refactor to use Cursor Pattern
* issue-16707 Refactor to use Cursor Pattern continued
* issue-16707 Add missing changes
* issue-16707 Add missing changes continued
* issue-16707 Address review comments, return 1 based page num instead of 0
based
* issue-16707 Make Cursor navigations thread safe
* issue-16707 Fix checkstyle and spottless violations
---------
Co-authored-by: John Solomon J <[email protected]>
---
.../apache/pinot/client/BaseResultSetGroup.java | 73 ++++
.../org/apache/pinot/client/BrokerResponse.java | 31 +-
.../java/org/apache/pinot/client/Connection.java | 71 +++-
.../org/apache/pinot/client/ConnectionFactory.java | 5 +
.../pinot/client/CursorAwareBrokerResponse.java | 131 ++++++++
.../org/apache/pinot/client/CursorCapable.java | 188 +++++++++++
.../apache/pinot/client/CursorResultSetGroup.java | 121 +++++++
.../client/JsonAsyncHttpPinotClientTransport.java | 291 +++++++++++++++-
.../java/org/apache/pinot/client/ResultCursor.java | 165 ++++++++++
.../org/apache/pinot/client/ResultCursorImpl.java | 366 +++++++++++++++++++++
.../org/apache/pinot/client/ResultSetGroup.java | 2 +-
.../apache/pinot/client/ConnectionFactoryTest.java | 144 ++++++++
.../client/CursorAwareBrokerResponseTest.java | 163 +++++++++
.../pinot/client/CursorResultSetGroupTest.java | 90 +++++
.../JsonAsyncHttpPinotClientTransportTest.java | 179 ++++++++++
.../org/apache/pinot/client/ResultCursorTest.java | 353 ++++++++++++++++++++
16 files changed, 2363 insertions(+), 10 deletions(-)
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BaseResultSetGroup.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BaseResultSetGroup.java
new file mode 100644
index 00000000000..9dd76e4b90f
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BaseResultSetGroup.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.List;
+
+/**
+ * Base interface for result set groups that provides common contract for
accessing query results.
+ *
+ * This interface defines the core methods that all result set group
implementations should provide,
+ * allowing for polymorphic usage across different result set group types
(regular and cursor-based).
+ *
+ * <p><strong>Backward Compatibility:</strong> This interface is designed to
be implemented by existing
+ * classes without breaking any existing code.
+ */
+public interface BaseResultSetGroup {
+
+ /**
+ * Returns the number of result sets in this result set group.
+ *
+ * There is one result set per aggregation function in the original query
+ * and one result set in the case of a selection query.
+ *
+ * @return The number of result sets, or 0 if there are no result sets
+ */
+ int getResultSetCount();
+
+ /**
+ * Obtains the result set at the given index, starting from zero.
+ *
+ * @param index The index for which to obtain the result set
+ * @return The result set at the given index
+ * @throws IndexOutOfBoundsException if the index is out of range
+ */
+ ResultSet getResultSet(int index);
+
+ /**
+ * Gets the execution statistics for this query.
+ *
+ * @return The execution statistics
+ */
+ ExecutionStats getExecutionStats();
+
+ /**
+ * Gets any exceptions that occurred during query processing.
+ *
+ * @return A list of exceptions, or an empty list if no exceptions occurred
+ */
+ List<PinotClientException> getExceptions();
+
+ /**
+ * Gets the underlying broker response.
+ *
+ * @return The broker response
+ */
+ BrokerResponse getBrokerResponse();
+}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
index a2c4d757bc7..a24f7f368f2 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
@@ -36,13 +36,14 @@ public class BrokerResponse {
private BrokerResponse() {
}
- private BrokerResponse(JsonNode brokerResponse) {
- _requestId = brokerResponse.get("requestId") != null ?
brokerResponse.get("requestId").asText() : "unknown";
- _brokerId = brokerResponse.get("brokerId") != null ?
brokerResponse.get("brokerId").asText() : "unknown";
- _aggregationResults = brokerResponse.get("aggregationResults");
- _exceptions = brokerResponse.get("exceptions");
- _selectionResults = brokerResponse.get("selectionResults");
- _resultTable = brokerResponse.get("resultTable");
+ protected BrokerResponse(JsonNode brokerResponse) {
+ // Use helper methods for consistent null handling
+ _requestId = getTextOrDefault(brokerResponse, "requestId", "unknown");
+ _brokerId = getTextOrDefault(brokerResponse, "brokerId", "unknown");
+ _aggregationResults = getJsonNodeOrNull(brokerResponse,
"aggregationResults");
+ _exceptions = getJsonNodeOrNull(brokerResponse, "exceptions");
+ _selectionResults = getJsonNodeOrNull(brokerResponse, "selectionResults");
+ _resultTable = getJsonNodeOrNull(brokerResponse, "resultTable");
_executionStats = ExecutionStats.fromJson(brokerResponse);
}
@@ -93,4 +94,20 @@ public class BrokerResponse {
public String getBrokerId() {
return _brokerId;
}
+
+ // Helper methods for extracting values from JsonNode with null checks
+ private static String getTextOrDefault(JsonNode node, String fieldName,
String defaultValue) {
+ if (node == null) {
+ return defaultValue;
+ }
+ JsonNode valueNode = node.get(fieldName);
+ return (valueNode != null && !valueNode.isNull()) ? valueNode.asText() :
defaultValue;
+ }
+
+ private static JsonNode getJsonNodeOrNull(JsonNode node, String fieldName) {
+ if (node == null) {
+ return null;
+ }
+ return node.get(fieldName);
+ }
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index 8ba457981db..49f2152557d 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -26,7 +26,6 @@ import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A connection to Pinot, normally created through calls to the {@link
ConnectionFactory}.
*/
@@ -205,4 +204,74 @@ public class Connection {
public PinotClientTransport<?> getTransport() {
return _transport;
}
+
+ /**
+ * Opens a cursor for the given query, enabling pagination through large
result sets.
+ * The returned cursor starts with the first page already loaded.
+ *
+ * @param query the query to execute
+ * @param pageSize the number of rows per page
+ * @return ResultCursor with the first page loaded and ready for navigation
+ * @throws PinotClientException If an exception occurs while processing the
query
+ */
+ public ResultCursor openCursor(String query, int pageSize) throws
PinotClientException {
+ try {
+ return openCursorAsync(query, pageSize).get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof PinotClientException) {
+ throw (PinotClientException) e.getCause();
+ } else if (e.getCause() instanceof UnsupportedOperationException) {
+ throw (UnsupportedOperationException) e.getCause();
+ } else {
+ throw new PinotClientException("Failed to open cursor", e);
+ }
+ }
+ }
+
+ /**
+ * Opens a cursor for the given query asynchronously, enabling pagination
through large result sets.
+ * The returned cursor starts with the first page already loaded.
+ *
+ * @param query the query to execute
+ * @param pageSize the number of rows per page
+ * @return CompletableFuture containing ResultCursor with the first page
loaded and ready for navigation
+ */
+ public CompletableFuture<ResultCursor> openCursorAsync(String query, int
pageSize) {
+ return validateCursorSupport()
+ .thenCompose(unused -> selectBrokerForCursor(query))
+ .thenCompose(brokerHostPort ->
executeInitialCursorQuery(brokerHostPort, query, pageSize));
+ }
+
+ private CompletableFuture<Void> validateCursorSupport() {
+ if (!(_transport instanceof CursorCapable)) {
+ return CompletableFuture.failedFuture(
+ new UnsupportedOperationException("Cursor operations not supported
by this connection type"));
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private CompletableFuture<String> selectBrokerForCursor(String query) {
+ String[] tableNames = resolveTableName(query);
+ String brokerHostPort = _brokerSelector.selectBroker(tableNames);
+ if (brokerHostPort == null) {
+ return CompletableFuture.failedFuture(
+ new PinotClientException("Could not find broker to execute cursor
query"));
+ }
+ return CompletableFuture.completedFuture(brokerHostPort);
+ }
+
+ private CompletableFuture<ResultCursor> executeInitialCursorQuery(String
brokerHostPort, String query, int pageSize) {
+ try {
+ CursorCapable cursorTransport = (CursorCapable) _transport;
+ return cursorTransport.executeQueryWithCursorAsync(brokerHostPort,
query, pageSize)
+ .thenApply(initialResponse -> {
+ if (initialResponse.hasExceptions() && _failOnExceptions) {
+ throw new PinotClientException("Query had processing exceptions:
\n" + initialResponse.getExceptions());
+ }
+ return (ResultCursor) new ResultCursorImpl(_transport,
brokerHostPort, initialResponse, _failOnExceptions);
+ });
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(new PinotClientException("Failed
to open cursor", e));
+ }
+ }
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
index 86e16675135..273ccb27467 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java
@@ -278,4 +278,9 @@ public class ConnectionFactory {
private static PinotClientTransport getDefault() {
return getDefault(new Properties());
}
+
+ @VisibleForTesting
+ static void resetDefaultTransport() {
+ _defaultTransport = null;
+ }
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorAwareBrokerResponse.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorAwareBrokerResponse.java
new file mode 100644
index 00000000000..7f3c0884f05
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorAwareBrokerResponse.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Extended BrokerResponse with cursor-specific fields for cursor pagination
queries.
+ * This class adds cursor metadata fields while maintaining full compatibility
with BrokerResponse.
+ */
+public class CursorAwareBrokerResponse extends BrokerResponse {
+ // Cursor-specific fields from Pinot documentation
+ private final Long _offset;
+ private final Integer _numRows;
+ private final Long _numRowsResultSet;
+ private Long _cursorResultWriteTimeMs;
+ private Long _submissionTimeMs;
+ private Long _expirationTimeMs;
+ private final String _brokerHost;
+ private Integer _brokerPort;
+ private Long _bytesWritten;
+ private final Long _cursorFetchTimeMs;
+
+ /**
+ * Creates a CursorAwareBrokerResponse by parsing cursor-specific fields
from the JSON response.
+ */
+ public static CursorAwareBrokerResponse fromJson(JsonNode brokerResponse) {
+ return new CursorAwareBrokerResponse(brokerResponse);
+ }
+
+
+ private CursorAwareBrokerResponse(JsonNode brokerResponse) {
+ super(brokerResponse); // Initialize base BrokerResponse fields
+
+ // Parse cursor-specific fields using helper methods (they handle null
nodes gracefully)
+ _offset = getLongOrNull(brokerResponse, "offset");
+ _numRows = getIntOrNull(brokerResponse, "numRows");
+ _numRowsResultSet = getLongOrNull(brokerResponse, "numRowsResultSet");
+ _cursorResultWriteTimeMs = getLongOrNull(brokerResponse,
"cursorResultWriteTimeMs");
+ _expirationTimeMs = getLongOrNull(brokerResponse, "expirationTimeMs");
+ _submissionTimeMs = getLongOrNull(brokerResponse, "submissionTimeMs");
+ _brokerHost = getTextOrNull(brokerResponse, "brokerHost");
+ _brokerPort = getIntOrNull(brokerResponse, "brokerPort");
+ _bytesWritten = getLongOrNull(brokerResponse, "bytesWritten");
+ _cursorFetchTimeMs = getLongOrNull(brokerResponse, "cursorFetchTimeMs");
+ }
+
+
+
+ // Cursor-specific field getters
+ public Long getOffset() {
+ return _offset;
+ }
+
+ public Integer getNumRows() {
+ return _numRows;
+ }
+
+ public Long getNumRowsResultSet() {
+ return _numRowsResultSet;
+ }
+
+ public Long getCursorResultWriteTimeMs() {
+ return _cursorResultWriteTimeMs;
+ }
+
+ public Long getSubmissionTimeMs() {
+ return _submissionTimeMs;
+ }
+
+ public Long getExpirationTimeMs() {
+ return _expirationTimeMs;
+ }
+
+ public String getBrokerHost() {
+ return _brokerHost;
+ }
+
+ public Integer getBrokerPort() {
+ return _brokerPort;
+ }
+
+ public Long getBytesWritten() {
+ return _bytesWritten;
+ }
+
+ public Long getCursorFetchTimeMs() {
+ return _cursorFetchTimeMs;
+ }
+
+ // Helper methods for extracting values from JsonNode with null checks
+ private static Long getLongOrNull(JsonNode node, String fieldName) {
+ if (node == null) {
+ return null;
+ }
+ JsonNode valueNode = node.get(fieldName);
+ return (valueNode != null && !valueNode.isNull()) ? valueNode.asLong() :
null;
+ }
+
+ private static Integer getIntOrNull(JsonNode node, String fieldName) {
+ if (node == null) {
+ return null;
+ }
+ JsonNode valueNode = node.get(fieldName);
+ return (valueNode != null && !valueNode.isNull()) ? valueNode.asInt() :
null;
+ }
+
+ private static String getTextOrNull(JsonNode node, String fieldName) {
+ if (node == null) {
+ return null;
+ }
+ JsonNode valueNode = node.get(fieldName);
+ return (valueNode != null && !valueNode.isNull()) ? valueNode.asText() :
null;
+ }
+}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorCapable.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorCapable.java
new file mode 100644
index 00000000000..a0467ceefcd
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorCapable.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Marker interface for transports that support cursor-based pagination
operations.
+ * Transports implementing this interface can be used with cursor pagination
features.
+ */
+public interface CursorCapable {
+
+ /**
+ * Executes a query with cursor support.
+ *
+ * @param brokerAddress The broker address to send the query to
+ * @param query The SQL query to execute
+ * @param numRows The number of rows to return per page
+ * @return CursorAwareBrokerResponse containing results and cursor metadata
+ * @throws PinotClientException If query execution fails
+ */
+ default CursorAwareBrokerResponse executeQueryWithCursor(String
brokerAddress, String query, int numRows)
+ throws PinotClientException {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Executes a query with cursor support asynchronously.
+ *
+ * @param brokerAddress The broker address to send the query to
+ * @param query The SQL query to execute
+ * @param numRows The number of rows to return per page
+ * @return CompletableFuture containing CursorAwareBrokerResponse with
results and cursor metadata
+ * @throws PinotClientException If query execution fails
+ */
+ default CompletableFuture<CursorAwareBrokerResponse>
executeQueryWithCursorAsync(String brokerAddress,
+ String query, int numRows) throws PinotClientException {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Fetches the next page for a cursor with specified offset and number of
rows.
+ *
+ * @param brokerAddress The broker address to send the request to
+ * @param cursorId The cursor identifier
+ * @param offset The offset for pagination
+ * @param numRows The number of rows to fetch
+ * @return CursorAwareBrokerResponse containing the next page
+ * @throws PinotClientException If the fetch operation fails
+ */
+ default CursorAwareBrokerResponse fetchNextPage(String brokerAddress, String
cursorId, int offset, int numRows)
+ throws PinotClientException {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Fetches the next page for a cursor asynchronously with specified offset
and number of rows.
+ *
+ * @param brokerAddress The broker address to send the request to
+ * @param cursorId The cursor identifier
+ * @param offset The offset for pagination
+ * @param numRows The number of rows to fetch
+ * @return CompletableFuture containing the next page
+ */
+ default CompletableFuture<CursorAwareBrokerResponse>
fetchNextPageAsync(String brokerAddress, String cursorId,
+ int offset, int numRows) {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Fetches the previous page for a cursor with specified offset and number
of rows.
+ *
+ * @param brokerAddress The broker address to send the request to
+ * @param cursorId The cursor identifier
+ * @param offset The offset for pagination
+ * @param numRows The number of rows to fetch
+ * @return CursorAwareBrokerResponse containing the previous page
+ * @throws PinotClientException If the fetch operation fails
+ */
+ default CursorAwareBrokerResponse fetchPreviousPage(String brokerAddress,
String cursorId, int offset, int numRows)
+ throws PinotClientException {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Fetches the previous page for a cursor asynchronously with specified
offset and number of rows.
+ *
+ * @param brokerAddress The broker address to send the request to
+ * @param cursorId The cursor identifier
+ * @param offset The offset for pagination
+ * @param numRows The number of rows to fetch
+ * @return CompletableFuture containing the previous page
+ */
+ default CompletableFuture<CursorAwareBrokerResponse>
fetchPreviousPageAsync(String brokerAddress, String cursorId,
+ int offset, int numRows) {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Seeks to a specific page for a cursor with specified number of rows.
+ *
+ * @param brokerAddress The broker address to send the request to
+ * @param cursorId The cursor identifier
+ * @param pageNumber The 1-based page number to seek to (will be converted
to offset internally)
+ * @param numRows The number of rows to fetch
+ * @return CursorAwareBrokerResponse containing the requested page
+ * @throws PinotClientException If the seek operation fails
+ */
+ default CursorAwareBrokerResponse seekToPage(String brokerAddress, String
cursorId, int pageNumber, int numRows)
+ throws PinotClientException {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Seeks to a specific page for a cursor asynchronously with specified
number of rows.
+ *
+ * @param brokerAddress The broker address to send the request to
+ * @param cursorId The cursor identifier
+ * @param pageNumber The 1-based page number to seek to (will be converted
to offset internally)
+ * @param numRows The number of rows to fetch
+ * @return CompletableFuture containing the requested page
+ */
+ default CompletableFuture<CursorAwareBrokerResponse> seekToPageAsync(String
brokerAddress, String cursorId,
+ int pageNumber, int numRows) {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Retrieves metadata for an existing cursor.
+ *
+ * @param brokerAddress The broker address in "host:port" format (must be
same as cursor creator)
+ * @param requestId The unique identifier of the cursor
+ * @return BrokerResponse containing cursor metadata
+ * @throws PinotClientException If metadata retrieval fails
+ */
+ default BrokerResponse getCursorMetadata(String brokerAddress, String
requestId) throws PinotClientException {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Retrieves metadata for an existing cursor asynchronously.
+ *
+ * @param brokerAddress The broker address in "host:port" format (must be
same as cursor creator)
+ * @param requestId The unique identifier of the cursor
+ * @return CompletableFuture containing BrokerResponse with cursor metadata
+ */
+ default CompletableFuture<BrokerResponse> getCursorMetadataAsync(String
brokerAddress, String requestId) {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Deletes a cursor and cleans up its resources.
+ *
+ * @param brokerAddress The broker address in "host:port" format (must be
same as cursor creator)
+ * @param requestId The unique identifier of the cursor to delete
+ * @throws PinotClientException If cursor deletion fails
+ */
+ default void deleteCursor(String brokerAddress, String requestId) throws
PinotClientException {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+
+ /**
+ * Deletes a cursor and cleans up its resources asynchronously.
+ *
+ * @param brokerAddress The broker address in "host:port" format (must be
same as cursor creator)
+ * @param requestId The unique identifier of the cursor to delete
+ * @return CompletableFuture that completes when the cursor is deleted
+ */
+ default CompletableFuture<Void> deleteCursorAsync(String brokerAddress,
String requestId) {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport implementation");
+ }
+}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorResultSetGroup.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorResultSetGroup.java
new file mode 100644
index 00000000000..6ea431d1d44
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/CursorResultSetGroup.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.List;
+
+/**
+ * A cursor-based result set group that delegates result set access to an
internal ResultSetGroup
+ * and provides cursor metadata access. This is a pure data container without
navigation logic.
+ *
+ * <p><strong>Thread Safety:</strong> This class is immutable and thread-safe.
+ */
+public class CursorResultSetGroup implements BaseResultSetGroup {
+ private final ResultSetGroup _resultSetGroup;
+ private final CursorAwareBrokerResponse _cursorResponse;
+
+ /**
+ * Creates a cursor result set group from a cursor-aware broker response.
+ *
+ * @param cursorResponse the cursor-aware broker response containing results
and cursor metadata
+ */
+ public CursorResultSetGroup(CursorAwareBrokerResponse cursorResponse) {
+ _cursorResponse = cursorResponse;
+ _resultSetGroup = new ResultSetGroup(cursorResponse);
+ }
+
+ public int getResultSetCount() {
+ return _resultSetGroup.getResultSetCount();
+ }
+
+ public ResultSet getResultSet(int index) {
+ return _resultSetGroup.getResultSet(index);
+ }
+
+ public String getCursorId() {
+ return _cursorResponse.getRequestId();
+ }
+
+ public int getPageSize() {
+ Integer numRows = _cursorResponse.getNumRows();
+ return numRows != null ? numRows : 0;
+ }
+
+ public long getExpirationTimeMs() {
+ Long expirationTime = _cursorResponse.getExpirationTimeMs();
+ return expirationTime != null ? expirationTime : 0L;
+ }
+
+ public Long getOffset() {
+ return _cursorResponse.getOffset();
+ }
+
+ public Long getNumRowsResultSet() {
+ return _cursorResponse.getNumRowsResultSet();
+ }
+
+ public Long getCursorResultWriteTimeMs() {
+ return _cursorResponse.getCursorResultWriteTimeMs();
+ }
+
+ public Long getSubmissionTimeMs() {
+ return _cursorResponse.getSubmissionTimeMs();
+ }
+
+ public Long getTotalRows() {
+ return _cursorResponse.getNumRowsResultSet();
+ }
+
+ public String getBrokerHost() {
+ return _cursorResponse.getBrokerHost();
+ }
+
+ public Integer getBrokerPort() {
+ return _cursorResponse.getBrokerPort();
+ }
+
+ public Long getBytesWritten() {
+ return _cursorResponse.getBytesWritten();
+ }
+
+ public Long getCursorFetchTimeMs() {
+ return _cursorResponse.getCursorFetchTimeMs();
+ }
+
+ public ExecutionStats getExecutionStats() {
+ return _resultSetGroup.getExecutionStats();
+ }
+
+ public List<PinotClientException> getExceptions() {
+ return _resultSetGroup.getExceptions();
+ }
+
+ public BrokerResponse getBrokerResponse() {
+ return _resultSetGroup.getBrokerResponse();
+ }
+
+ /**
+ * Gets the cursor-aware broker response for accessing cursor metadata.
+ *
+ * @return the cursor-aware broker response
+ */
+ public CursorAwareBrokerResponse getCursorResponse() {
+ return _cursorResponse;
+ }
+}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
index c0191ed3613..0453a3bfdd9 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
/**
* JSON encoded Pinot client transport over AsyncHttpClient.
*/
-public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<ClientStats> {
+public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<ClientStats>, CursorCapable {
private static final Logger LOGGER =
LoggerFactory.getLogger(JsonAsyncHttpPinotClientTransport.class);
private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;
private static final String DEFAULT_EXTRA_QUERY_OPTION_STRING =
"groupByMode=sql;responseFormat=sql";
@@ -152,6 +152,295 @@ public class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport<C
}
}
+ /**
+ * Executes a query with cursor pagination support.
+ *
+ * @param brokerAddress The broker address in "host:port" format
+ * @param query The SQL query to execute
+ * @param numRows The number of rows to return in the first page
+ * @return BrokerResponse containing the first page and cursor metadata
+ * @throws PinotClientException If query execution fails
+ */
+ @Override
+ public CursorAwareBrokerResponse executeQueryWithCursor(String
brokerAddress, String query, int numRows)
+ throws PinotClientException {
+ try {
+ return executeQueryWithCursorAsync(brokerAddress, query,
numRows).get(_brokerReadTimeout, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new PinotClientException(e);
+ }
+ }
+
+ /**
+ * Executes a query asynchronously with cursor pagination support.
+ *
+ * @param brokerAddress The broker address in "host:port" format
+ * @param query The SQL query to execute
+ * @param numRows The number of rows to return in the first page
+ * @return CompletableFuture containing BrokerResponse with first page and
cursor metadata
+ */
+ @Override
+ public CompletableFuture<CursorAwareBrokerResponse>
executeQueryWithCursorAsync(String brokerAddress, String query,
+ int numRows) {
+ try {
+ ObjectNode json = JsonNodeFactory.instance.objectNode();
+ json.put("sql", query);
+ if (_extraOptionStr != null && !_extraOptionStr.isEmpty()) {
+ json.put("queryOptions", _extraOptionStr);
+ }
+
+ LOGGER.debug("Cursor query will use Multistage Engine = {}",
_useMultistageEngine);
+
+ String url = String.format("%s://%s%s?getCursor=true&numRows=%d",
_scheme, brokerAddress,
+ _useMultistageEngine ? "/query" : "/query/sql", numRows);
+ BoundRequestBuilder requestBuilder = _httpClient.preparePost(url);
+
+ if (_headers != null) {
+ _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+ }
+ LOGGER.debug("Sending cursor query {} to {}", query, url);
+ return requestBuilder.addHeader("Content-Type", "application/json;
charset=utf-8").setBody(json.toString())
+ .execute().toCompletableFuture().thenApply(httpResponse -> {
+ LOGGER.debug("Completed cursor query, HTTP status is {}",
httpResponse.getStatusCode());
+
+ if (httpResponse.getStatusCode() != 200) {
+ throw new PinotClientException(
+ "Pinot returned HTTP status " + httpResponse.getStatusCode()
+ ", expected 200");
+ }
+
+ try {
+ return
CursorAwareBrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(new PinotClientException(e));
+ }
+ }
+
+
+ /**
+ * Retrieves metadata for an existing cursor.
+ *
+ * @param brokerAddress The broker address in "host:port" format (must be
same as cursor creator)
+ * @param requestId The unique identifier of the cursor
+ * @return BrokerResponse containing cursor metadata
+ * @throws PinotClientException If metadata retrieval fails
+ */
+ @Override
+ public BrokerResponse getCursorMetadata(String brokerAddress, String
requestId) throws PinotClientException {
+ try {
+ return getCursorMetadataAsync(brokerAddress,
requestId).get(_brokerReadTimeout, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new PinotClientException(e);
+ }
+ }
+
+ /**
+ * Retrieves metadata for an existing cursor asynchronously.
+ *
+ * @param brokerAddress The broker address in "host:port" format (must be
same as cursor creator)
+ * @param requestId The unique identifier of the cursor
+ * @return CompletableFuture containing BrokerResponse with cursor metadata
+ */
+ @Override
+ public CompletableFuture<BrokerResponse> getCursorMetadataAsync(String
brokerAddress, String requestId) {
+ try {
+ String url = String.format("%s://%s/responseStore/%s/", _scheme,
brokerAddress, requestId);
+ BoundRequestBuilder requestBuilder = _httpClient.prepareGet(url);
+
+ if (_headers != null) {
+ _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+ }
+ LOGGER.debug("Getting cursor metadata from {}", url);
+ return
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+ LOGGER.debug("Completed cursor metadata fetch, HTTP status is {}",
httpResponse.getStatusCode());
+
+ if (httpResponse.getStatusCode() != 200) {
+ throw new PinotClientException(
+ "Pinot returned HTTP status " + httpResponse.getStatusCode() +
", expected 200");
+ }
+
+ try {
+ return
BrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
+ } catch (Exception e) {
+ throw new PinotClientException(e);
+ }
+ }
+
+ /**
+ * Deletes a cursor and cleans up its resources.
+ *
+ * @param brokerAddress The broker address in "host:port" format (must be
same as cursor creator)
+ * @param requestId The unique identifier of the cursor to delete
+ * @throws PinotClientException If cursor deletion fails
+ */
+ @Override
+ public void deleteCursor(String brokerAddress, String requestId) throws
PinotClientException {
+ try {
+ deleteCursorAsync(brokerAddress, requestId).get(_brokerReadTimeout,
TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new PinotClientException(e);
+ }
+ }
+
+ /**
+ * Deletes a cursor and cleans up its resources asynchronously.
+ *
+ * @param brokerAddress The broker address in "host:port" format (must be
same as cursor creator)
+ * @param requestId The unique identifier of the cursor to delete
+ * @return CompletableFuture that completes when the cursor is deleted
+ */
+ @Override
+ public CompletableFuture<Void> deleteCursorAsync(String brokerAddress,
String requestId) {
+ try {
+ String url = String.format("%s://%s/responseStore/%s/", _scheme,
brokerAddress, requestId);
+ BoundRequestBuilder requestBuilder = _httpClient.prepareDelete(url);
+
+ if (_headers != null) {
+ _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+ }
+ LOGGER.debug("Deleting cursor at {}", url);
+ return
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+ LOGGER.debug("Completed cursor deletion, HTTP status is {}",
httpResponse.getStatusCode());
+
+ if (httpResponse.getStatusCode() != 200) {
+ throw new PinotClientException(
+ "Pinot returned HTTP status " + httpResponse.getStatusCode() +
", expected 200");
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ throw new PinotClientException(e);
+ }
+ }
+
+
+ public CursorAwareBrokerResponse fetchNextPage(String brokerAddress, String
cursorId, int offset, int numRows)
+ throws PinotClientException {
+ try {
+ return fetchNextPageAsync(brokerAddress, cursorId, offset,
numRows).get(_brokerReadTimeout,
+ TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new PinotClientException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<CursorAwareBrokerResponse>
fetchNextPageAsync(String brokerAddress, String cursorId,
+ int offset, int numRows) {
+ try {
+ String url =
String.format("%s://%s/responseStore/%s/results?offset=%d&numRows=%d", _scheme,
brokerAddress,
+ cursorId, offset, numRows);
+ BoundRequestBuilder requestBuilder = _httpClient.prepareGet(url);
+
+ if (_headers != null) {
+ _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+ }
+
+ return
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+ if (httpResponse.getStatusCode() != 200) {
+ throw new PinotClientException(
+ "Pinot returned HTTP status " + httpResponse.getStatusCode() +
", expected 200");
+ }
+
+ try {
+ return
CursorAwareBrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(new PinotClientException(e));
+ }
+ }
+
+
+ public CursorAwareBrokerResponse fetchPreviousPage(String brokerAddress,
String cursorId, int offset, int numRows)
+ throws PinotClientException {
+ try {
+ return fetchPreviousPageAsync(brokerAddress, cursorId, offset,
numRows).get(_brokerReadTimeout,
+ TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new PinotClientException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<CursorAwareBrokerResponse>
fetchPreviousPageAsync(String brokerAddress, String cursorId,
+ int offset, int numRows) {
+ try {
+ String url =
String.format("%s://%s/responseStore/%s/results?offset=%d&numRows=%d", _scheme,
brokerAddress,
+ cursorId, offset, numRows);
+ BoundRequestBuilder requestBuilder = _httpClient.prepareGet(url);
+
+ if (_headers != null) {
+ _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+ }
+
+ return
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+ if (httpResponse.getStatusCode() != 200) {
+ throw new PinotClientException(
+ "Pinot returned HTTP status " + httpResponse.getStatusCode() +
", expected 200");
+ }
+
+ try {
+ return
CursorAwareBrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(new PinotClientException(e));
+ }
+ }
+
+
+ public CursorAwareBrokerResponse seekToPage(String brokerAddress, String
cursorId, int pageNumber, int numRows)
+ throws PinotClientException {
+ try {
+ return seekToPageAsync(brokerAddress, cursorId, pageNumber,
numRows).get(_brokerReadTimeout,
+ TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new PinotClientException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<CursorAwareBrokerResponse> seekToPageAsync(String
brokerAddress, String cursorId,
+ int pageNumber, int numRows) {
+ try {
+ int offset = (pageNumber - 1) * numRows;
+ String url =
String.format("%s://%s/responseStore/%s/results?offset=%d&numRows=%d", _scheme,
brokerAddress,
+ cursorId, offset, numRows);
+ BoundRequestBuilder requestBuilder = _httpClient.prepareGet(url);
+
+ if (_headers != null) {
+ _headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+ }
+
+ return
requestBuilder.execute().toCompletableFuture().thenApply(httpResponse -> {
+ if (httpResponse.getStatusCode() != 200) {
+ throw new PinotClientException(
+ "Pinot returned HTTP status " + httpResponse.getStatusCode() +
", expected 200");
+ }
+
+ try {
+ return
CursorAwareBrokerResponse.fromJson(OBJECT_READER.readTree(httpResponse.getResponseBodyAsStream()));
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(new PinotClientException(e));
+ }
+ }
+
@Override
public ClientStats getClientMetrics() {
return _httpClient.getClientStats();
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursor.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursor.java
new file mode 100644
index 00000000000..e38af328aef
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursor.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A cursor for navigating through paginated query results using the Cursor
Handle Pattern.
+ *
+ * This interface provides a clean abstraction for cursor-based pagination,
+ * where each page contains a complete CursorResultSetGroup with all result
sets
+ * and metadata. The cursor starts with the first page already loaded.
+ *
+ * Usage example:
+ * <pre>
+ * try (ResultCursor cursor = connection.openCursor("SELECT * FROM table",
1000)) {
+ * // First page is immediately available
+ * CursorResultSetGroup firstPage = cursor.getCurrentPage();
+ *
+ * // Navigate through remaining pages
+ * while (cursor.hasNext()) {
+ * CursorResultSetGroup page = cursor.next();
+ * // Process all result sets in the page
+ * for (int i = 0; i < page.getResultSetCount(); i++) {
+ * ResultSet rs = page.getResultSet(i);
+ * // Process result set
+ * }
+ * }
+ * }
+ * </pre>
+ */
+public interface ResultCursor extends AutoCloseable {
+
+ /**
+ * Gets the current page of results without navigation.
+ * The first page is available immediately when the cursor is created.
+ *
+ * @return the current page of results
+ */
+ CursorResultSetGroup getCurrentPage();
+
+ /**
+ * Checks if there are more pages available after the current page.
+ *
+ * @return true if more pages are available, false otherwise
+ */
+ boolean hasNext();
+
+ /**
+ * Checks if there are previous pages available before the current page.
+ *
+ * @return true if previous pages are available, false otherwise
+ */
+ boolean hasPrevious();
+
+ /**
+ * Fetches the next page of results and advances the cursor position.
+ *
+ * @return the next page of results
+ * @throws PinotClientException if an error occurs while fetching
+ * @throws IllegalStateException if no next page is available
+ */
+ CursorResultSetGroup next() throws PinotClientException;
+
+ /**
+ * Fetches the previous page of results and moves the cursor position
backward.
+ *
+ * @return the previous page of results
+ * @throws PinotClientException if an error occurs while fetching
+ * @throws IllegalStateException if no previous page is available
+ */
+ CursorResultSetGroup previous() throws PinotClientException;
+
+ /**
+ * Seeks to a specific page number (1-based) and updates the cursor position.
+ *
+ * @param pageNumber the page number to seek to (1-based)
+ * @return the requested page of results
+ * @throws PinotClientException if an error occurs while seeking
+ * @throws IllegalArgumentException if pageNumber is invalid
+ */
+ CursorResultSetGroup seekToPage(int pageNumber) throws PinotClientException;
+
+ /**
+ * Fetches the next page of results asynchronously.
+ *
+ * @return a CompletableFuture containing the next page of results
+ */
+ CompletableFuture<CursorResultSetGroup> nextAsync();
+
+ /**
+ * Fetches the previous page of results asynchronously.
+ *
+ * @return a CompletableFuture containing the previous page of results
+ */
+ CompletableFuture<CursorResultSetGroup> previousAsync();
+
+ /**
+ * Seeks to a specific page number asynchronously.
+ *
+ * @param pageNumber the page number to seek to (1-based)
+ * @return a CompletableFuture containing the requested page of results
+ */
+ CompletableFuture<CursorResultSetGroup> seekToPageAsync(int pageNumber);
+
+ /**
+ * Gets the current cursor ID.
+ *
+ * @return the cursor ID
+ */
+ String getCursorId();
+
+ /**
+ * Gets the current page number (1-based).
+ *
+ * @return the current page number
+ */
+ int getCurrentPageNumber();
+
+ /**
+ * Gets the total number of rows across all pages, if known.
+ *
+ * @return the total number of rows, or -1 if unknown
+ */
+ long getTotalRows();
+
+ /**
+ * Gets the page size for this cursor.
+ *
+ * @return the page size
+ */
+ int getPageSize();
+
+ /**
+ * Checks if the cursor has expired on the server.
+ *
+ * @return true if the cursor has expired, false otherwise
+ */
+ boolean isExpired();
+
+ /**
+ * Closes the cursor and releases any associated resources.
+ * This sends a cleanup request to the server to free cursor resources.
+ *
+ * @throws PinotClientException if an error occurs while closing
+ */
+ @Override
+ void close() throws PinotClientException;
+}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursorImpl.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursorImpl.java
new file mode 100644
index 00000000000..c609e489832
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultCursorImpl.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of ResultCursor that manages cursor-based pagination.
+ *
+ * This class encapsulates the cursor state and navigation logic,
+ * keeping the Connection and CursorResultSetGroup classes focused
+ * on their primary responsibilities. The cursor starts with the first
+ * page already loaded.
+ *
+ * Thread Safety: All navigation methods are synchronized to ensure
+ * safe concurrent access to cursor state.
+ */
+public class ResultCursorImpl implements ResultCursor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ResultCursorImpl.class);
+
+ private final PinotClientTransport<?> _transport;
+ private final String _brokerHostPort;
+ private final boolean _failOnExceptions;
+ private final int _pageSize;
+
+ // Synchronization lock for cursor state
+ private final Object _stateLock = new Object();
+
+ // Volatile for visibility across threads
+ private volatile CursorResultSetGroup _currentPage;
+ private volatile int _currentPageNumber;
+ private volatile boolean _closed;
+
+ /**
+ * Creates a new cursor with the initial page of results already loaded.
+ *
+ * @param transport the transport to use for navigation
+ * @param brokerHostPort the broker host and port
+ * @param initialResponse the initial cursor-aware response (first page)
+ * @param failOnExceptions whether to fail on query exceptions
+ */
+ public ResultCursorImpl(PinotClientTransport<?> transport, String
brokerHostPort,
+ CursorAwareBrokerResponse initialResponse, boolean
failOnExceptions) {
+ _transport = transport;
+ _brokerHostPort = brokerHostPort;
+ _failOnExceptions = failOnExceptions;
+ _pageSize = initialResponse.getNumRows() != null
+ ? initialResponse.getNumRows().intValue() : -1;
+ // Initialize state - no synchronization needed in constructor
+ _currentPage = new CursorResultSetGroup(initialResponse);
+ _currentPageNumber = 0;
+ _closed = false;
+ }
+
+ @Override
+ public CursorResultSetGroup getCurrentPage() {
+ synchronized (_stateLock) {
+ checkNotClosed();
+ return _currentPage;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ synchronized (_stateLock) {
+ checkNotClosed();
+ CursorAwareBrokerResponse response = _currentPage.getCursorResponse();
+ Long offset = response.getOffset();
+ Integer numRows = response.getNumRows();
+ Long totalRows = response.getNumRowsResultSet();
+
+ if (offset == null || numRows == null || totalRows == null) {
+ return false;
+ }
+ return (offset + numRows) < totalRows;
+ }
+ }
+
+ @Override
+ public boolean hasPrevious() {
+ synchronized (_stateLock) {
+ checkNotClosed();
+ CursorAwareBrokerResponse response = _currentPage.getCursorResponse();
+ Long offset = response.getOffset();
+ return offset != null && offset > 0;
+ }
+ }
+
+ @Override
+ public CursorResultSetGroup next() throws PinotClientException {
+ return executeNavigation(
+ () -> {
+ CursorAwareBrokerResponse response =
_currentPage.getCursorResponse();
+ Long offset = response.getOffset();
+ Integer numRows = response.getNumRows();
+ Long totalRows = response.getNumRowsResultSet();
+ if (offset == null || numRows == null || totalRows == null
+ || (offset + numRows) >= totalRows) {
+ throw new IllegalStateException("No next page available");
+ }
+ },
+ () -> _currentPageNumber * _pageSize,
+ (transport, cursorId, offset) ->
transport.fetchNextPage(_brokerHostPort, cursorId, offset, _pageSize),
+ () -> _currentPageNumber++,
+ "Failed to fetch next page"
+ );
+ }
+
+ @Override
+ public CursorResultSetGroup previous() throws PinotClientException {
+ return executeNavigation(
+ () -> {
+ CursorAwareBrokerResponse response =
_currentPage.getCursorResponse();
+ Long offset = response.getOffset();
+ if (offset == null || offset <= 0) {
+ throw new IllegalStateException("No previous page available");
+ }
+ },
+ () -> (_currentPageNumber - 1) * _pageSize,
+ (transport, cursorId, offset) ->
transport.fetchPreviousPage(_brokerHostPort, cursorId, offset, _pageSize),
+ () -> _currentPageNumber--,
+ "Failed to fetch previous page"
+ );
+ }
+
+ @Override
+ public CursorResultSetGroup seekToPage(int pageNumber) throws
PinotClientException {
+ return executeNavigation(
+ () -> {
+ if (pageNumber <= 0) {
+ throw new IllegalArgumentException("Page number must be positive
(1-based)");
+ }
+ },
+ () -> (pageNumber - 1) * _pageSize,
+ (transport, cursorId, offset) -> transport.seekToPage(_brokerHostPort,
cursorId, pageNumber, _pageSize),
+ () -> _currentPageNumber = pageNumber - 1,
+ "Failed to seek to page " + pageNumber
+ );
+ }
+
+ @Override
+ public CompletableFuture<CursorResultSetGroup> nextAsync() {
+ return executeNavigationAsync(
+ () -> {
+ CursorAwareBrokerResponse response =
_currentPage.getCursorResponse();
+ Long offset = response.getOffset();
+ Integer numRows = response.getNumRows();
+ Long totalRows = response.getNumRowsResultSet();
+ if (offset == null || numRows == null || totalRows == null
+ || (offset + numRows) >= totalRows) {
+ throw new IllegalStateException("No next page available");
+ }
+ },
+ () -> _currentPageNumber * _pageSize,
+ (transport, cursorId, offset) ->
transport.fetchNextPageAsync(_brokerHostPort, cursorId, offset, _pageSize),
+ () -> _currentPageNumber++
+ );
+ }
+
+ @Override
+ public CompletableFuture<CursorResultSetGroup> previousAsync() {
+ return executeNavigationAsync(
+ () -> {
+ CursorAwareBrokerResponse response =
_currentPage.getCursorResponse();
+ Long offset = response.getOffset();
+ if (offset == null || offset <= 0) {
+ throw new IllegalStateException("No previous page available");
+ }
+ },
+ () -> (_currentPageNumber - 1) * _pageSize,
+ (transport, cursorId, offset) ->
transport.fetchPreviousPageAsync(_brokerHostPort, cursorId, offset, _pageSize),
+ () -> _currentPageNumber--
+ );
+ }
+
+ @Override
+ public CompletableFuture<CursorResultSetGroup> seekToPageAsync(int
pageNumber) {
+ return executeNavigationAsync(
+ () -> {
+ if (pageNumber <= 0) {
+ throw new IllegalArgumentException("Page number must be positive
(1-based)");
+ }
+ },
+ () -> (pageNumber - 1) * _pageSize,
+ (transport, cursorId, offset) ->
transport.seekToPageAsync(_brokerHostPort, cursorId, pageNumber, _pageSize),
+ () -> _currentPageNumber = pageNumber - 1
+ );
+ }
+
+ private CursorResultSetGroup executeNavigation(
+ Runnable validator,
+ IntSupplier offsetCalculator,
+ TransportFunction transportFunction,
+ Runnable stateUpdater,
+ String errorMessage) throws PinotClientException {
+ int offset;
+ String cursorId;
+ synchronized (_stateLock) {
+ checkNotClosed();
+ validator.run();
+ offset = offsetCalculator.getAsInt();
+ cursorId = _currentPage.getCursorId();
+ }
+
+ JsonAsyncHttpPinotClientTransport transport = getValidatedTransport();
+
+ try {
+ CursorAwareBrokerResponse response = transportFunction.apply(transport,
cursorId, offset);
+ return updateStateAndReturn(response, stateUpdater);
+ } catch (PinotClientException e) {
+ throw new PinotClientException(errorMessage, e);
+ }
+ }
+
+ private CompletableFuture<CursorResultSetGroup> executeNavigationAsync(
+ Runnable validator,
+ IntSupplier offsetCalculator,
+ AsyncTransportFunction transportFunction,
+ Runnable stateUpdater) {
+ try {
+ int offset;
+ String cursorId;
+ synchronized (_stateLock) {
+ checkNotClosed();
+ validator.run();
+ offset = offsetCalculator.getAsInt();
+ cursorId = _currentPage.getCursorId();
+ }
+
+ JsonAsyncHttpPinotClientTransport transport = getValidatedTransport();
+ return transportFunction.apply(transport, cursorId, offset)
+ .thenApply(response -> updateStateAndReturn(response, stateUpdater));
+ } catch (IllegalStateException | IllegalArgumentException |
UnsupportedOperationException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ private JsonAsyncHttpPinotClientTransport getValidatedTransport() {
+ if (!(_transport instanceof JsonAsyncHttpPinotClientTransport)) {
+ throw new UnsupportedOperationException("Cursor operations not supported
by this transport type");
+ }
+ return (JsonAsyncHttpPinotClientTransport) _transport;
+ }
+
+ @FunctionalInterface
+ private interface TransportFunction {
+ CursorAwareBrokerResponse apply(JsonAsyncHttpPinotClientTransport
transport, String cursorId, int offset)
+ throws PinotClientException;
+ }
+
+ @FunctionalInterface
+ private interface AsyncTransportFunction {
+ CompletableFuture<CursorAwareBrokerResponse>
apply(JsonAsyncHttpPinotClientTransport transport,
+ String cursorId, int offset);
+ }
+
+ /**
+ * Common method to update cursor state and return result for both sync and
async operations.
+ * This eliminates code duplication between navigation methods.
+ */
+ private CursorResultSetGroup updateStateAndReturn(CursorAwareBrokerResponse
response, Runnable stateUpdater) {
+ if (response.hasExceptions() && _failOnExceptions) {
+ throw new PinotClientException("Query had processing exceptions: \n" +
response.getExceptions());
+ }
+
+ synchronized (_stateLock) {
+ if (_closed) {
+ throw new IllegalStateException("Cursor was closed during operation");
+ }
+ _currentPage = new CursorResultSetGroup(response);
+ stateUpdater.run();
+ }
+ return _currentPage;
+ }
+
+ @Override
+ public String getCursorId() {
+ synchronized (_stateLock) {
+ checkNotClosed();
+ return _currentPage.getCursorId();
+ }
+ }
+
+ @Override
+ public int getCurrentPageNumber() {
+ synchronized (_stateLock) {
+ checkNotClosed();
+ return _currentPageNumber + 1;
+ }
+ }
+
+ @Override
+ public long getTotalRows() {
+ synchronized (_stateLock) {
+ checkNotClosed();
+ Long numRows = _currentPage.getTotalRows();
+ return numRows != null ? numRows : -1;
+ }
+ }
+
+ @Override
+ public int getPageSize() {
+ return _pageSize;
+ }
+
+ @Override
+ public boolean isExpired() {
+ if (_closed) {
+ return true;
+ }
+
+ long expirationTime;
+ synchronized (_stateLock) {
+ if (_closed) {
+ return true;
+ }
+ expirationTime = _currentPage.getExpirationTimeMs();
+ }
+
+ return System.currentTimeMillis() > expirationTime;
+ }
+
+ @Override
+ public void close() throws PinotClientException {
+ if (_closed) {
+ return;
+ }
+
+ // Get cursor ID before marking as closed
+ String cursorId;
+ synchronized (_stateLock) {
+ cursorId = _currentPage.getCursorId();
+ _closed = true;
+ }
+
+ // Server-side cursor cleanup could be implemented in the future
+ // This would involve sending a DELETE request to the broker to clean up
cursor resources
+ // For now, cursors will expire naturally based on their expiration time
+ LOGGER.debug("Cursor {} closed, server-side cleanup not yet implemented",
cursorId);
+ }
+
+ private void checkNotClosed() {
+ if (_closed) {
+ throw new IllegalStateException("Cursor is closed");
+ }
+ }
+}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java
index d20765e3416..1c964dd9b3c 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
/**
* A Pinot result set group, containing the results given back by Pinot for a
given query.
*/
-public class ResultSetGroup {
+public class ResultSetGroup implements BaseResultSetGroup {
private final List<ResultSet> _resultSets;
private final ExecutionStats _executionStats;
private final List<PinotClientException> _exceptions;
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
index 18c51b17f11..1240181e651 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
@@ -18,11 +18,14 @@
*/
package org.apache.pinot.client;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import org.I0Itec.zkclient.ZkClient;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -120,4 +123,145 @@ public class ConnectionFactoryTest {
Assert.assertNotNull(connection.getTransport());
Assert.assertNotNull(connection.getTransport().getClientMetrics());
}
+
+ @Test
+ public void testConnectionFactoryMethodsPreserved() {
+ // Test that the ConnectionFactory class has the expected methods
+ Method[] methods = ConnectionFactory.class.getDeclaredMethods();
+
+ boolean hasFromZookeeper = false;
+ boolean hasFromController = false;
+ boolean hasFromHostList = false;
+ boolean hasFromProperties = false;
+
+ for (Method method : methods) {
+ if (method.getName().equals("fromZookeeper") && method.getReturnType()
== Connection.class) {
+ hasFromZookeeper = true;
+ }
+ if (method.getName().equals("fromController") && method.getReturnType()
== Connection.class) {
+ hasFromController = true;
+ }
+ if (method.getName().equals("fromHostList") && method.getReturnType() ==
Connection.class) {
+ hasFromHostList = true;
+ }
+ if (method.getName().equals("fromProperties") && method.getReturnType()
== Connection.class) {
+ hasFromProperties = true;
+ }
+ }
+
+ // Verify existing methods are preserved
+ Assert.assertTrue(hasFromZookeeper, "fromZookeeper methods should be
preserved");
+ Assert.assertTrue(hasFromController, "fromController methods should be
preserved");
+ Assert.assertTrue(hasFromHostList, "fromHostList methods should be
preserved");
+ Assert.assertTrue(hasFromProperties, "fromProperties methods should be
preserved");
+
+ // Verify that Connection has current cursor API (openCursor method)
+ Method[] connectionMethods = Connection.class.getDeclaredMethods();
+ boolean hasOpenCursor = false;
+
+ for (Method method : connectionMethods) {
+ if (method.getName().equals("openCursor")) {
+ hasOpenCursor = true;
+ }
+ }
+
+ Assert.assertTrue(hasOpenCursor, "Connection should have openCursor
method");
+ }
+
+ @Test
+ public void testConnectionCursorFunctionalityWithJsonTransport() {
+ // Test that connections created with JsonAsyncHttpPinotClientTransport
support cursor operations
+ List<String> brokers = ImmutableList.of("127.0.0.1:1234");
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ Connection connection = ConnectionFactory.fromHostList(brokers,
factory.buildTransport());
+
+ // Verify the connection has JsonAsyncHttpPinotClientTransport
+ Assert.assertTrue(connection.getTransport() instanceof
JsonAsyncHttpPinotClientTransport,
+ "Connection should use JsonAsyncHttpPinotClientTransport for cursor
support");
+ }
+
+ @Test
+ public void testOpenCursorWithUnsupportedTransport() {
+ // Test that openCursor throws UnsupportedOperationException with
non-JsonAsyncHttpPinotClientTransport
+ List<String> brokers = ImmutableList.of("127.0.0.1:1234");
+ PinotClientTransport<?> mockTransport =
Mockito.mock(PinotClientTransport.class);
+ Connection connection = new Connection(brokers, mockTransport);
+
+ try {
+ connection.openCursor("SELECT * FROM testTable", 10);
+ Assert.fail("Expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ Assert.assertEquals("Cursor operations not supported by this connection
type", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOpenCursorWithNullBroker() {
+ // Test that openCursor throws PinotClientException when no broker is
available
+ BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
+
Mockito.when(mockBrokerSelector.selectBroker(Mockito.any())).thenReturn(null);
+
+ JsonAsyncHttpPinotClientTransport mockTransport =
Mockito.mock(JsonAsyncHttpPinotClientTransport.class);
+ Connection connection = new Connection(mockBrokerSelector, mockTransport);
+
+ try {
+ connection.openCursor("SELECT * FROM testTable", 10);
+ Assert.fail("Expected PinotClientException");
+ } catch (PinotClientException e) {
+ Assert.assertEquals("Could not find broker to execute cursor query",
e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOpenCursorWithValidParameters() throws Exception {
+ // Test successful openCursor call with valid parameters
+ BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
+
Mockito.when(mockBrokerSelector.selectBroker(Mockito.any())).thenReturn("localhost:8099");
+ JsonAsyncHttpPinotClientTransport mockTransport =
Mockito.mock(JsonAsyncHttpPinotClientTransport.class);
+ CursorAwareBrokerResponse mockResponse =
Mockito.mock(CursorAwareBrokerResponse.class);
+ Mockito.when(mockResponse.hasExceptions()).thenReturn(false);
+
+ // Mock both sync and async methods since openCursor now uses async
internally
+ Mockito.when(mockTransport.executeQueryWithCursor(Mockito.anyString(),
Mockito.anyString(), Mockito.anyInt()))
+ .thenReturn(mockResponse);
+
Mockito.when(mockTransport.executeQueryWithCursorAsync(Mockito.anyString(),
Mockito.anyString(), Mockito.anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(mockResponse));
+
+ Connection connection = new Connection(mockBrokerSelector, mockTransport);
+
+ ResultCursor cursor = connection.openCursor("SELECT * FROM testTable", 10);
+ Assert.assertNotNull(cursor, "Cursor should not be null");
+
+ // Verify that table name resolution was used
+
Mockito.verify(mockBrokerSelector).selectBroker(Mockito.any(String[].class));
+
Mockito.verify(mockTransport).executeQueryWithCursorAsync("localhost:8099",
"SELECT * FROM testTable", 10);
+ }
+
+ @Test
+ public void testOpenCursorWithQueryExceptions() throws Exception {
+ // Test openCursor behavior when query has exceptions and failOnExceptions
is true
+ Properties props = new Properties();
+ props.setProperty(Connection.FAIL_ON_EXCEPTIONS, "true");
+
+ BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
+
Mockito.when(mockBrokerSelector.selectBroker(Mockito.any())).thenReturn("localhost:8099");
+
+ JsonAsyncHttpPinotClientTransport mockTransport =
Mockito.mock(JsonAsyncHttpPinotClientTransport.class);
+ CursorAwareBrokerResponse mockResponse =
Mockito.mock(CursorAwareBrokerResponse.class);
+ Mockito.when(mockResponse.hasExceptions()).thenReturn(true);
+ JsonNode mockExceptions = Mockito.mock(JsonNode.class);
+ Mockito.when(mockResponse.getExceptions()).thenReturn(mockExceptions);
+ Mockito.when(mockTransport.executeQueryWithCursor(Mockito.anyString(),
Mockito.anyString(), Mockito.anyInt()))
+ .thenReturn(mockResponse);
+
+ Connection connection = new Connection(props, mockBrokerSelector,
mockTransport);
+
+ try {
+ connection.openCursor("SELECT * FROM invalidTable", 10);
+ Assert.fail("Expected PinotClientException due to query exceptions");
+ } catch (PinotClientException e) {
+ Assert.assertTrue(e.getMessage().contains("Failed to open cursor"),
+ "Expected exception message to contain 'Failed to open cursor', but
was: " + e.getMessage());
+ }
+ }
}
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorAwareBrokerResponseTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorAwareBrokerResponseTest.java
new file mode 100644
index 00000000000..61c385de257
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorAwareBrokerResponseTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class CursorAwareBrokerResponseTest {
+
+ @Test
+ public void testCursorAwareBrokerResponseWithAllFields() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ String jsonResponse = "{"
+ + "\"requestId\": \"test-request-123\","
+ + "\"brokerId\": \"broker-1\","
+ + "\"offset\": 100,"
+ + "\"numRows\": 50,"
+ + "\"numRowsResultSet\": 1000,"
+ + "\"cursorResultWriteTimeMs\": 1234567890,"
+ + "\"submissionTimeMs\": 1234567800,"
+ + "\"expirationTimeMs\": 1234567900,"
+ + "\"brokerHost\": \"localhost\","
+ + "\"brokerPort\": 8099,"
+ + "\"bytesWritten\": 2048,"
+ + "\"cursorFetchTimeMs\": 150,"
+ + "\"resultTable\": {"
+ + " \"dataSchema\": {"
+ + " \"columnNames\": [\"col1\", \"col2\"],"
+ + " \"columnDataTypes\": [\"STRING\", \"INT\"]"
+ + " },"
+ + " \"rows\": [[\"value1\", 123], [\"value2\", 456]]"
+ + "}"
+ + "}";
+
+ JsonNode jsonNode = mapper.readTree(jsonResponse);
+ CursorAwareBrokerResponse response =
CursorAwareBrokerResponse.fromJson(jsonNode);
+
+ // Test cursor-specific fields
+ Assert.assertEquals(response.getOffset(), Long.valueOf(100));
+ Assert.assertEquals(response.getNumRows(), Integer.valueOf(50));
+ Assert.assertEquals(response.getNumRowsResultSet(), Long.valueOf(1000));
+ Assert.assertEquals(response.getCursorResultWriteTimeMs(),
Long.valueOf(1234567890));
+ Assert.assertEquals(response.getSubmissionTimeMs(),
Long.valueOf(1234567800));
+ Assert.assertEquals(response.getExpirationTimeMs(),
Long.valueOf(1234567900));
+ Assert.assertEquals(response.getBrokerHost(), "localhost");
+ Assert.assertEquals(response.getBrokerPort(), Integer.valueOf(8099));
+ Assert.assertEquals(response.getBytesWritten(), Long.valueOf(2048));
+ Assert.assertEquals(response.getCursorFetchTimeMs(), Long.valueOf(150));
+
+ // Test inherited BrokerResponse fields
+ Assert.assertEquals(response.getRequestId(), "test-request-123");
+ Assert.assertEquals(response.getBrokerId(), "broker-1");
+ Assert.assertNotNull(response.getResultTable());
+ }
+
+ @Test
+ public void testCursorAwareBrokerResponseWithNullFields() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ String jsonResponse = "{"
+ + "\"requestId\": \"test-request-456\","
+ + "\"offset\": null,"
+ + "\"numRows\": null,"
+ + "\"numRowsResultSet\": null,"
+ + "\"brokerHost\": null"
+ + "}";
+
+ JsonNode jsonNode = mapper.readTree(jsonResponse);
+ CursorAwareBrokerResponse response =
CursorAwareBrokerResponse.fromJson(jsonNode);
+
+ // Test null fields are handled correctly
+ Assert.assertNull(response.getOffset());
+ Assert.assertNull(response.getNumRows());
+ Assert.assertNull(response.getNumRowsResultSet());
+ Assert.assertNull(response.getBrokerHost());
+ Assert.assertNull(response.getBrokerPort());
+ Assert.assertNull(response.getCursorResultWriteTimeMs());
+ Assert.assertNull(response.getSubmissionTimeMs());
+ Assert.assertNull(response.getExpirationTimeMs());
+ Assert.assertNull(response.getBytesWritten());
+ Assert.assertNull(response.getCursorFetchTimeMs());
+ }
+
+ @Test
+ public void testCursorAwareBrokerResponseWithNullInput() {
+ CursorAwareBrokerResponse response =
CursorAwareBrokerResponse.fromJson(null);
+
+ // All cursor fields should be null
+ Assert.assertNull(response.getOffset());
+ Assert.assertNull(response.getNumRows());
+ Assert.assertNull(response.getNumRowsResultSet());
+ Assert.assertNull(response.getCursorResultWriteTimeMs());
+ Assert.assertNull(response.getExpirationTimeMs());
+ Assert.assertNull(response.getSubmissionTimeMs());
+ Assert.assertNull(response.getBrokerHost());
+ Assert.assertNull(response.getBrokerPort());
+ Assert.assertNull(response.getBytesWritten());
+ Assert.assertNull(response.getCursorFetchTimeMs());
+ }
+
+ @Test
+ public void testCursorAwareBrokerResponseWithMissingFields() throws
Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ String jsonResponse = "{"
+ + "\"requestId\": \"test-request-789\","
+ + "\"brokerId\": \"broker-2\""
+ + "}";
+
+ JsonNode jsonNode = mapper.readTree(jsonResponse);
+ CursorAwareBrokerResponse response =
CursorAwareBrokerResponse.fromJson(jsonNode);
+
+ // Test missing fields are handled as null
+ Assert.assertNull(response.getOffset());
+ Assert.assertNull(response.getNumRows());
+ Assert.assertNull(response.getNumRowsResultSet());
+ Assert.assertNull(response.getBrokerHost());
+ Assert.assertNull(response.getBrokerPort());
+
+ // Test inherited fields work
+ Assert.assertEquals(response.getRequestId(), "test-request-789");
+ Assert.assertEquals(response.getBrokerId(), "broker-2");
+ }
+
+ @Test
+ public void testCursorAwareBrokerResponseWithZeroValues() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ String jsonResponse = "{"
+ + "\"requestId\": \"test-request-000\","
+ + "\"offset\": 0,"
+ + "\"numRows\": 0,"
+ + "\"numRowsResultSet\": 0,"
+ + "\"brokerPort\": 0,"
+ + "\"bytesWritten\": 0"
+ + "}";
+
+ JsonNode jsonNode = mapper.readTree(jsonResponse);
+ CursorAwareBrokerResponse response =
CursorAwareBrokerResponse.fromJson(jsonNode);
+
+ // Test zero values are preserved
+ Assert.assertEquals(response.getOffset(), Long.valueOf(0));
+ Assert.assertEquals(response.getNumRows(), Integer.valueOf(0));
+ Assert.assertEquals(response.getNumRowsResultSet(), Long.valueOf(0));
+ Assert.assertEquals(response.getBrokerPort(), Integer.valueOf(0));
+ Assert.assertEquals(response.getBytesWritten(), Long.valueOf(0));
+ }
+}
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorResultSetGroupTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorResultSetGroupTest.java
new file mode 100644
index 00000000000..3db59ab5636
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/CursorResultSetGroupTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for CursorResultSetGroup class.
+ */
+public class CursorResultSetGroupTest {
+
+ @Test
+ public void testConstructorWithValidParameters() {
+ // Create a mock CursorAwareBrokerResponse
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode mockResponse = mapper.createObjectNode()
+ .put("offset", 0)
+ .put("numRows", 10)
+ .put("numRowsResultSet", 100);
+
+ CursorAwareBrokerResponse cursorResponse =
CursorAwareBrokerResponse.fromJson(mockResponse);
+
+ CursorResultSetGroup cursorResultSetGroup = new
CursorResultSetGroup(cursorResponse);
+ Assert.assertNotNull(cursorResultSetGroup);
+ }
+
+ @Test
+ public void testGetCursorFields() {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode mockResponse = mapper.createObjectNode()
+ .put("offset", 5)
+ .put("numRows", 15)
+ .put("numRowsResultSet", 200);
+
+ CursorAwareBrokerResponse cursorResponse =
CursorAwareBrokerResponse.fromJson(mockResponse);
+
+ CursorResultSetGroup cursorResultSetGroup = new
CursorResultSetGroup(cursorResponse);
+ Assert.assertEquals(cursorResultSetGroup.getOffset(), Long.valueOf(5));
+ Assert.assertEquals(cursorResultSetGroup.getPageSize(), 15);
+ Assert.assertEquals(cursorResultSetGroup.getNumRowsResultSet(),
Long.valueOf(200));
+ }
+
+
+ @Test
+ public void testResultSetDelegation() {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode mockResponse = mapper.createObjectNode();
+
+ CursorAwareBrokerResponse cursorResponse =
CursorAwareBrokerResponse.fromJson(mockResponse);
+
+ CursorResultSetGroup cursorResultSetGroup = new
CursorResultSetGroup(cursorResponse);
+ Assert.assertEquals(cursorResultSetGroup.getResultSetCount(), 0);
+ }
+
+
+ @Test
+ public void testMetadataExtraction() {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode mockResponse = mapper.createObjectNode()
+ .put("offset", 10)
+ .put("numRows", 20)
+ .put("numRowsResultSet", 500);
+
+ CursorAwareBrokerResponse cursorResponse =
CursorAwareBrokerResponse.fromJson(mockResponse);
+
+ CursorResultSetGroup cursorResultSetGroup = new
CursorResultSetGroup(cursorResponse);
+ Assert.assertEquals(cursorResultSetGroup.getOffset(), Long.valueOf(10));
+ Assert.assertEquals(cursorResultSetGroup.getPageSize(), 20);
+ Assert.assertEquals(cursorResultSetGroup.getNumRowsResultSet(),
Long.valueOf(500));
+ }
+ }
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportTest.java
index 4f2b2cbf816..a211cdf8b76 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportTest.java
@@ -21,11 +21,14 @@ package org.apache.pinot.client;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -34,6 +37,8 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -43,9 +48,25 @@ public class JsonAsyncHttpPinotClientTransportTest
implements HttpHandler {
+
"\"aggregationResults\":[{\"function\":\"count_star\",\"value\":\"36542\"}],\"timeUsedMs\":30,"
+
"\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":115545,\"numServersResponded\":99}";
+ private static final String _CURSOR_RESPONSE_JSON =
"{\"requestId\":\"cursor-123\",\"traceInfo\":{},"
+ + "\"resultTable\":{\"dataSchema\":{\"columnNames\":[\"col1\",\"col2\"],"
+ + "\"columnDataTypes\":[\"STRING\",\"INT\"]},"
+ + "\"rows\":[[\"value1\",123],[\"value2\",456]],"
+ +
"\"metadata\":{\"currentPage\":0,\"pageSize\":100,\"totalRows\":1000,\"totalPages\":10,"
+ + "\"hasNext\":true,\"hasPrevious\":false,\"expirationTimeMs\":" +
(System.currentTimeMillis() + 300000) + "}},"
+ +
"\"timeUsedMs\":25,\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":1000,\"numServersResponded\":1}";
+
+ private static final String _CURSOR_METADATA_JSON =
"{\"requestId\":\"cursor-123\",\"traceInfo\":{},"
+ +
"\"resultTable\":{\"metadata\":{\"currentPage\":0,\"pageSize\":100,\"totalRows\":1000,\"totalPages\":10,"
+ + "\"hasNext\":true,\"hasPrevious\":false,\"expirationTimeMs\":" +
(System.currentTimeMillis() + 300000) + "}},"
+ +
"\"timeUsedMs\":5,\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":1000,\"numServersResponded\":1}";
+
private HttpServer _dummyServer;
private String _responseJson = _VALID_RESPONSE_JSON;
private long _responseDelayMs = 0;
+ private String _requestPath = "";
+ private String _requestMethod = "";
+ private String _requestBody = "";
@BeforeClass
public void setUp()
@@ -60,6 +81,9 @@ public class JsonAsyncHttpPinotClientTransportTest implements
HttpHandler {
public void setUpTestCase() {
_responseJson = _VALID_RESPONSE_JSON;
_responseDelayMs = 0L;
+ _requestPath = "";
+ _requestMethod = "";
+ _requestBody = "";
}
@AfterClass
@@ -116,9 +140,164 @@ public class JsonAsyncHttpPinotClientTransportTest
implements HttpHandler {
}
}
+ // Cursor-related tests
+ @Test
+ public void testExecuteQueryWithCursor() {
+ _responseJson = _CURSOR_RESPONSE_JSON;
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+ BrokerResponse response = transport.executeQueryWithCursor("localhost:" +
_dummyServer.getAddress().getPort(),
+ "select * from planets", 100);
+
+ assertFalse(response.hasExceptions());
+ assertEquals(response.getRequestId(), "cursor-123");
+ assertNotNull(response.getResultTable());
+ assertTrue(_requestPath.contains("getCursor=true"));
+ assertTrue(_requestPath.contains("numRows=100"));
+ assertTrue(_requestBody.contains("\"sql\":\"select * from planets\""));
+ }
+
+ @Test
+ public void testFetchCursorResults()
+ throws Exception {
+ _responseJson = _CURSOR_RESPONSE_JSON;
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+ CursorAwareBrokerResponse response = transport.fetchNextPage("localhost:"
+ + _dummyServer.getAddress().getPort(), "cursor-123", 0, 10);
+ assertNotNull(response);
+ assertTrue(_requestPath.contains("/responseStore/cursor-123/results"));
+ assertTrue(_requestPath.contains("offset=0"));
+ assertTrue(_requestPath.contains("numRows=10"));
+ assertEquals(_requestMethod, "GET");
+ }
+
+ @Test
+ public void testExecuteQueryWithCursorAsync() throws Exception {
+ _responseJson = _CURSOR_RESPONSE_JSON;
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+ CompletableFuture<CursorAwareBrokerResponse> future =
transport.executeQueryWithCursorAsync(
+ "localhost:" + _dummyServer.getAddress().getPort(), "select * from
planets", 50);
+
+ CursorAwareBrokerResponse response = future.get();
+ assertFalse(response.hasExceptions());
+ assertEquals(response.getRequestId(), "cursor-123");
+ assertNotNull(response.getResultTable());
+ assertTrue(_requestPath.contains("getCursor=true"));
+ assertTrue(_requestPath.contains("numRows=50"));
+ assertTrue(_requestBody.contains("\"sql\":\"select * from planets\""));
+ }
+
+ @Test
+ public void testFetchNextPageWithOffsetAndNumRows() {
+ _responseJson = _CURSOR_RESPONSE_JSON;
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+
+ // Use the CursorCapable interface method instead of the removed legacy
method
+ CursorAwareBrokerResponse response = transport.fetchNextPage("localhost:"
+ _dummyServer.getAddress().getPort(),
+ "cursor-123", 100, 50);
+
+ assertFalse(response.hasExceptions());
+ assertEquals(response.getRequestId(), "cursor-123");
+ assertTrue(_requestPath.contains("/responseStore/cursor-123/results"));
+ assertTrue(_requestPath.contains("offset=100"));
+ assertTrue(_requestPath.contains("numRows=50"));
+ assertEquals(_requestMethod, "GET");
+ }
+
+ @Test
+ public void testFetchCursorResultsAsync() throws Exception {
+ _responseJson = _CURSOR_RESPONSE_JSON;
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+ CompletableFuture<CursorAwareBrokerResponse> future =
transport.fetchNextPageAsync(
+ "localhost:" + _dummyServer.getAddress().getPort(), "cursor-123", 0,
10);
+
+ CursorAwareBrokerResponse response = future.get();
+ assertFalse(response.hasExceptions());
+ assertEquals(response.getRequestId(), "cursor-123");
+ assertTrue(_requestPath.contains("/responseStore/cursor-123/results"));
+ assertEquals(_requestMethod, "GET");
+ }
+
+ @Test
+ public void testGetCursorMetadata() {
+ _responseJson = _CURSOR_METADATA_JSON;
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+ BrokerResponse response = transport.getCursorMetadata("localhost:" +
_dummyServer.getAddress().getPort(),
+ "cursor-123");
+
+ assertFalse(response.hasExceptions());
+ assertEquals(response.getRequestId(), "cursor-123");
+ assertTrue(_requestPath.contains("/responseStore/cursor-123/"));
+ assertEquals(_requestMethod, "GET");
+ }
+
+ @Test
+ public void testGetCursorMetadataAsync() throws Exception {
+ _responseJson = _CURSOR_METADATA_JSON;
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+ CompletableFuture<BrokerResponse> future =
transport.getCursorMetadataAsync(
+ "localhost:" + _dummyServer.getAddress().getPort(), "cursor-123");
+
+ BrokerResponse response = future.get();
+ assertFalse(response.hasExceptions());
+ assertEquals(response.getRequestId(), "cursor-123");
+ assertTrue(_requestPath.contains("/responseStore/cursor-123/"));
+ assertEquals(_requestMethod, "GET");
+ }
+
+ @Test
+ public void testDeleteCursor() {
+ _responseJson = "{}"; // Empty response for delete
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+
+ // Should not throw exception
+ transport.deleteCursor("localhost:" + _dummyServer.getAddress().getPort(),
"cursor-123");
+
+ assertTrue(_requestPath.contains("/responseStore/cursor-123/"));
+ assertEquals(_requestMethod, "DELETE");
+ }
+
+ @Test
+ public void testDeleteCursorAsync() throws Exception {
+ _responseJson = "{}"; // Empty response for delete
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ JsonAsyncHttpPinotClientTransport transport =
(JsonAsyncHttpPinotClientTransport) factory.buildTransport();
+ CompletableFuture<Void> future = transport.deleteCursorAsync(
+ "localhost:" + _dummyServer.getAddress().getPort(), "cursor-123");
+
+ future.get(); // Should complete without exception
+ assertTrue(_requestPath.contains("/responseStore/cursor-123/"));
+ assertEquals(_requestMethod, "DELETE");
+ }
+
@Override
public void handle(HttpExchange exchange)
throws IOException {
+ // Capture request details for verification
+ _requestPath = exchange.getRequestURI().toString();
+ _requestMethod = exchange.getRequestMethod();
+
+ // Capture request body for POST requests
+ if ("POST".equals(_requestMethod)) {
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(exchange.getRequestBody()));
+ StringBuilder requestBody = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ requestBody.append(line);
+ }
+ _requestBody = requestBody.toString();
+ reader.close();
+ } else {
+ _requestBody = "";
+ }
+
if (_responseDelayMs > 0) {
try {
Thread.sleep(_responseDelayMs);
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultCursorTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultCursorTest.java
new file mode 100644
index 00000000000..6e3312155a5
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultCursorTest.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ResultCursorTest {
+
+ private static class MockCursorTransport extends
JsonAsyncHttpPinotClientTransport {
+ private int _currentPage = 1;
+ private final int _totalPages = 3;
+ private final int _pageSize = 10;
+
+ public MockCursorTransport() {
+ super();
+ }
+
+ @Override
+ public CursorAwareBrokerResponse executeQueryWithCursor(String
brokerHostPort, String query, int numRows) {
+ _currentPage = 1;
+ return createMockResponse(1, true, false);
+ }
+
+ @Override
+ public CursorAwareBrokerResponse fetchNextPage(String brokerHostPort,
String cursorId, int offset, int numRows) {
+ _currentPage++;
+ return createMockResponse(_currentPage, _currentPage < _totalPages,
_currentPage > 1);
+ }
+
+ @Override
+ public CompletableFuture<CursorAwareBrokerResponse>
fetchNextPageAsync(String brokerHostPort, String cursorId,
+ int offset, int numRows) {
+ return CompletableFuture.completedFuture(fetchNextPage(brokerHostPort,
cursorId, offset, numRows));
+ }
+
+ @Override
+ public CursorAwareBrokerResponse fetchPreviousPage(String brokerHostPort,
String cursorId, int offset,
+ int numRows) {
+ _currentPage--;
+ return createMockResponse(_currentPage, _currentPage < _totalPages,
_currentPage > 1);
+ }
+
+ @Override
+ public CompletableFuture<CursorAwareBrokerResponse>
fetchPreviousPageAsync(String brokerHostPort, String cursorId,
+ int offset, int numRows) {
+ return
CompletableFuture.completedFuture(fetchPreviousPage(brokerHostPort, cursorId,
offset, numRows));
+ }
+
+ @Override
+ public CursorAwareBrokerResponse seekToPage(String brokerHostPort, String
cursorId, int pageNumber, int numRows) {
+ // pageNumber is 1-based, store as current page number (1-based)
+ _currentPage = pageNumber;
+ return createMockResponse(_currentPage, _currentPage < _totalPages,
_currentPage > 1);
+ }
+
+ @Override
+ public CompletableFuture<CursorAwareBrokerResponse> seekToPageAsync(String
brokerHostPort, String cursorId,
+ int pageNumber, int numRows) {
+ // pageNumber is 1-based, store as current page number (1-based)
+ _currentPage = pageNumber;
+ return CompletableFuture.completedFuture(
+ createMockResponse(_currentPage, _currentPage < _totalPages,
_currentPage > 1));
+ }
+
+ private CursorAwareBrokerResponse createMockResponse(int pageNum, boolean
hasNext, boolean hasPrevious) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ // Calculate total rows across all pages
+ int totalRows = _totalPages * _pageSize;
+ // Calculate offset for this page (0-based offset)
+ int offset = (pageNum - 1) * _pageSize;
+ String jsonResponse = String.format(
+ "{"
+ + "\"requestId\": \"cursor-test-%d\","
+ + "\"numRowsResultSet\": %d,"
+ + "\"numRows\": %d,"
+ + "\"offset\": %d,"
+ + "\"expirationTimeMs\": %d,"
+ + "\"resultTable\": {"
+ + " \"dataSchema\": {"
+ + " \"columnNames\": [\"col1\", \"col2\"],"
+ + " \"columnDataTypes\": [\"STRING\", \"INT\"]"
+ + " },"
+ + " \"rows\": ["
+ + " [\"value%d_1\", %d],"
+ + " [\"value%d_2\", %d]"
+ + " ]"
+ + "}"
+ + "}",
+ pageNum, totalRows, _pageSize, offset,
+ System.currentTimeMillis() + 300000,
+ pageNum, pageNum * 10,
+ pageNum, pageNum * 10 + 1
+ );
+
+ JsonNode jsonNode = mapper.readTree(jsonResponse);
+ return CursorAwareBrokerResponse.fromJson(jsonNode);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create mock response", e);
+ }
+ }
+ }
+
+ @Test
+ public void testCursorCreation() throws Exception {
+ MockCursorTransport transport = new MockCursorTransport();
+ CursorAwareBrokerResponse initialResponse =
transport.executeQueryWithCursor("localhost:8000",
+ "SELECT * FROM test", 10);
+
+ try (ResultCursor cursor = new ResultCursorImpl(transport,
"localhost:8000", initialResponse, false)) {
+ Assert.assertNotNull(cursor.getCurrentPage());
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 1);
+ Assert.assertEquals(cursor.getPageSize(), 10);
+ Assert.assertTrue(cursor.hasNext());
+ Assert.assertFalse(cursor.hasPrevious());
+ Assert.assertNotNull(cursor.getCursorId());
+ Assert.assertFalse(cursor.isExpired());
+ }
+ }
+
+ @Test
+ public void testCursorNavigation() throws Exception {
+ MockCursorTransport transport = new MockCursorTransport();
+ CursorAwareBrokerResponse initialResponse =
transport.executeQueryWithCursor("localhost:8000",
+ "SELECT * FROM test", 10);
+
+ try (ResultCursor cursor = new ResultCursorImpl(transport,
"localhost:8000", initialResponse,
+ false)) {
+ // Test next navigation
+ Assert.assertTrue(cursor.hasNext());
+ CursorResultSetGroup page1 = cursor.next();
+ Assert.assertNotNull(page1);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+ Assert.assertTrue(cursor.hasNext());
+ Assert.assertTrue(cursor.hasPrevious());
+
+ // Test next again
+ CursorResultSetGroup page2 = cursor.next();
+ Assert.assertNotNull(page2);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 3);
+ Assert.assertFalse(cursor.hasNext());
+ Assert.assertTrue(cursor.hasPrevious());
+
+ // Test previous navigation
+ CursorResultSetGroup page2Again = cursor.previous();
+ Assert.assertNotNull(page2Again);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+ Assert.assertTrue(cursor.hasNext());
+ Assert.assertTrue(cursor.hasPrevious());
+
+ // Test previous to first page
+ CursorResultSetGroup page1Back = cursor.previous();
+ Assert.assertNotNull(page1Back);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 1);
+ Assert.assertTrue(cursor.hasNext());
+ Assert.assertFalse(cursor.hasPrevious());
+ }
+ }
+
+ @Test
+ public void testSeekToPage() throws Exception {
+ MockCursorTransport transport = new MockCursorTransport();
+ CursorAwareBrokerResponse initialResponse =
transport.executeQueryWithCursor("localhost:8000",
+ "SELECT * FROM test", 10);
+
+ try (ResultCursor cursor = new ResultCursorImpl(transport,
"localhost:8000", initialResponse, false)) {
+ // Seek to page 2
+ CursorResultSetGroup page2 = cursor.seekToPage(2);
+ Assert.assertNotNull(page2);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+
+
+ Assert.assertTrue(cursor.hasNext());
+ Assert.assertTrue(cursor.hasPrevious());
+
+ // Seek back to page 1
+ CursorResultSetGroup page1 = cursor.seekToPage(1);
+ Assert.assertNotNull(page1);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 1);
+ Assert.assertTrue(cursor.hasNext());
+ Assert.assertFalse(cursor.hasPrevious());
+ }
+ }
+
+ @Test
+ public void testAsyncNavigation() throws Exception {
+ MockCursorTransport transport = new MockCursorTransport();
+ CursorAwareBrokerResponse initialResponse =
transport.executeQueryWithCursor("localhost:8000",
+ "SELECT * FROM test", 10);
+
+ try (ResultCursor cursor = new ResultCursorImpl(transport,
"localhost:8000", initialResponse, false)) {
+ // Test async next
+ CompletableFuture<CursorResultSetGroup> nextFuture = cursor.nextAsync();
+ CursorResultSetGroup page2 = nextFuture.get();
+ Assert.assertNotNull(page2);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+
+ // Test async previous
+ CompletableFuture<CursorResultSetGroup> prevFuture =
cursor.previousAsync();
+ CursorResultSetGroup page1 = prevFuture.get();
+ Assert.assertNotNull(page1);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 1);
+
+ // Test async seek
+ CompletableFuture<CursorResultSetGroup> seekFuture =
cursor.seekToPageAsync(2);
+ CursorResultSetGroup page2Seek = seekFuture.get();
+ Assert.assertNotNull(page2Seek);
+ Assert.assertEquals(cursor.getCurrentPageNumber(), 2);
+ }
+ }
+
+ @Test
+ public void testNavigationBoundaryConditions() throws Exception {
+ MockCursorTransport transport = new MockCursorTransport();
+ CursorAwareBrokerResponse initialResponse =
transport.executeQueryWithCursor("localhost:8000",
+ "SELECT * FROM test", 10);
+
+ try (ResultCursor cursor = new ResultCursorImpl(transport,
"localhost:8000", initialResponse, false)) {
+ // Try to go previous when at first page
+ Assert.assertFalse(cursor.hasPrevious());
+ try {
+ cursor.previous();
+ Assert.fail("Expected IllegalStateException");
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "No previous page available");
+ }
+
+ // Navigate to last page
+ cursor.next();
+ cursor.next();
+ Assert.assertFalse(cursor.hasNext());
+
+ // Try to go next when at last page
+ try {
+ cursor.next();
+ Assert.fail("Expected IllegalStateException");
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "No next page available");
+ }
+ }
+ }
+
+ @Test
+ public void testClosedCursorOperations() throws Exception {
+ MockCursorTransport transport = new MockCursorTransport();
+ CursorAwareBrokerResponse initialResponse =
transport.executeQueryWithCursor("localhost:8000",
+ "SELECT * FROM test", 10);
+
+ ResultCursor cursor = new ResultCursorImpl(transport, "localhost:8000",
initialResponse, false);
+ cursor.close();
+ // Test that all operations throw IllegalStateException after close
+ try {
+ cursor.getCurrentPage();
+ Assert.fail("Expected IllegalStateException");
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Cursor is closed");
+ }
+
+ try {
+ cursor.hasNext();
+ Assert.fail("Expected IllegalStateException");
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Cursor is closed");
+ }
+
+ try {
+ cursor.next();
+ Assert.fail("Expected IllegalStateException");
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "Cursor is closed");
+ }
+ }
+
+ @Test
+ public void testInvalidSeekPageNumber() throws Exception {
+ MockCursorTransport transport = new MockCursorTransport();
+ CursorAwareBrokerResponse initialResponse =
transport.executeQueryWithCursor("localhost:8000",
+ "SELECT * FROM test", 10);
+
+ try (ResultCursor cursor = new ResultCursorImpl(transport,
"localhost:8000", initialResponse, false)) {
+ try {
+ cursor.seekToPage(-1);
+ Assert.fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "Page number must be positive
(1-based)");
+ }
+ }
+ }
+
+ @Test
+ public void testAsyncNavigationBoundaryConditions() throws Exception {
+ MockCursorTransport transport = new MockCursorTransport();
+ CursorAwareBrokerResponse initialResponse =
transport.executeQueryWithCursor("localhost:8000",
+ "SELECT * FROM test", 10);
+
+ try (ResultCursor cursor = new ResultCursorImpl(transport,
"localhost:8000", initialResponse, false)) {
+ // Try async previous when at first page
+ CompletableFuture<CursorResultSetGroup> prevFuture =
cursor.previousAsync();
+ try {
+ prevFuture.get();
+ Assert.fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalStateException);
+ Assert.assertEquals(e.getCause().getMessage(), "No previous page
available");
+ }
+
+ // Navigate to last page
+ cursor.next();
+ cursor.next();
+
+ // Try async next when at last page
+ CompletableFuture<CursorResultSetGroup> nextFuture = cursor.nextAsync();
+ try {
+ nextFuture.get();
+ Assert.fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalStateException);
+ Assert.assertEquals(e.getCause().getMessage(), "No next page
available");
+ }
+
+ // Try async seek with invalid page number
+ CompletableFuture<CursorResultSetGroup> seekFuture =
cursor.seekToPageAsync(-1);
+ try {
+ seekFuture.get();
+ Assert.fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+ Assert.assertEquals(e.getCause().getMessage(), "Page number must be
positive (1-based)");
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]