This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch test-pagination in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 1a76a93eea2d76973272356e1dee270d5d56b800 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Fri Nov 4 16:39:28 2022 -0700 Add sample code to show how pagination protocol works in broker code --- .../requesthandler/BaseBrokerRequestHandler.java | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 224f46c440..aa59089251 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -30,6 +30,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -284,6 +286,47 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); } + if (Boolean.parseBoolean(pinotQuery.getQueryOptions().get("pagination"))) { + String tableName = TableNameBuilder.extractRawTableName(pinotQuery.getDataSource().getTableName()); + // Step 1: Generate a pointer. + // TODO: a. add a method to generate a ID + // b. replace the dummyInstanceId with a real one. + int hash = ("dummyInstanceId" + requestId + System.currentTimeMillis()).hashCode(); + String pointer = tableName + "_" + hash; + + // Step 2: TODO invoke pagination query initialization API. + + // Step 3: Submit to query executor. + final SqlNodeAndOptions finalSqlNodeAndOptions = sqlNodeAndOptions; + CompletableFuture.supplyAsync(() -> { + try { + return handleRequest(requestId, query, pinotQuery, compilationStartTimeNs, finalSqlNodeAndOptions, request, + requesterIdentity, requestContext); + } catch (Exception e) { + throw new CompletionException(e); + } + }).thenApply(brokerResponseNative -> { + // Step 5: TODO invoke upload result API. + + return null; + }).exceptionally(exception -> { + // Step 6: TODO Handle exception. + System.out.println(); + return null; + }); + + // Step 4: TODO Put pointer only to the response and return. + return new BrokerResponseNative(); + } + + return handleRequest(requestId, query, pinotQuery, compilationStartTimeNs, sqlNodeAndOptions, request, requesterIdentity, + requestContext); + } + + private BrokerResponseNative handleRequest(long requestId, String query, PinotQuery pinotQuery, + long compilationStartTimeNs, @Nullable SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, + @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) + throws Exception { if (isLiteralOnlyQuery(pinotQuery)) { LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query); try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org