This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch test-pagination
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 1a76a93eea2d76973272356e1dee270d5d56b800
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Nov 4 16:39:28 2022 -0700

    Add sample code to show how pagination protocol works in broker code
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 224f46c440..aa59089251 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -30,6 +30,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -284,6 +286,47 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
     }
 
+    if (Boolean.parseBoolean(pinotQuery.getQueryOptions().get("pagination"))) {
+      String tableName = 
TableNameBuilder.extractRawTableName(pinotQuery.getDataSource().getTableName());
+      // Step 1: Generate a pointer.
+      // TODO: a. add a method to generate a ID
+      //       b. replace the dummyInstanceId with a real one.
+      int hash = ("dummyInstanceId" + requestId + 
System.currentTimeMillis()).hashCode();
+      String pointer = tableName + "_" + hash;
+
+      // Step 2: TODO invoke pagination query initialization API.
+
+      // Step 3: Submit to query executor.
+      final SqlNodeAndOptions finalSqlNodeAndOptions = sqlNodeAndOptions;
+      CompletableFuture.supplyAsync(() -> {
+        try {
+          return handleRequest(requestId, query, pinotQuery, 
compilationStartTimeNs, finalSqlNodeAndOptions, request,
+              requesterIdentity, requestContext);
+        } catch (Exception e) {
+          throw new CompletionException(e);
+        }
+      }).thenApply(brokerResponseNative -> {
+        // Step 5: TODO invoke upload result API.
+
+        return null;
+      }).exceptionally(exception -> {
+        // Step 6: TODO Handle exception.
+        System.out.println();
+        return null;
+      });
+
+      // Step 4: TODO Put pointer only to the response and return.
+      return new BrokerResponseNative();
+    }
+
+    return handleRequest(requestId, query, pinotQuery, compilationStartTimeNs, 
sqlNodeAndOptions, request, requesterIdentity,
+        requestContext);
+  }
+
+  private BrokerResponseNative handleRequest(long requestId, String query, 
PinotQuery pinotQuery,
+      long compilationStartTimeNs, @Nullable SqlNodeAndOptions 
sqlNodeAndOptions, JsonNode request,
+      @Nullable RequesterIdentity requesterIdentity, RequestContext 
requestContext)
+      throws Exception {
     if (isLiteralOnlyQuery(pinotQuery)) {
       LOGGER.debug("Request {} contains only Literal, skipping server query: 
{}", requestId, query);
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to