This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8563e22716 Extract compileRequest from 
BaseSingleStageRequestHandler.doHandleRequest (#15073)
8563e22716 is described below

commit 8563e227168f96de8b0f5fe1fb77529d6725baa3
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Fri Mar 7 11:32:13 2025 +0530

    Extract compileRequest from BaseSingleStageRequestHandler.doHandleRequest 
(#15073)
---
 .../BaseSingleStageBrokerRequestHandler.java       | 320 ++++++++++++---------
 1 file changed, 192 insertions(+), 128 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index a1cfa18d51..3320636a8a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -311,139 +311,64 @@ public abstract class 
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
     }
   }
 
+  /**
+   * CompileResult holds the result of the compilation phase. Compilation may 
or may not be successful. If compilation
+   * is successful then all member variables other than BrokerResponse will be 
available. If compilation is not
+   * successful, then only the BrokerResponse is set. This is done to keep the 
current behaviour as is.
+   * It became hard to keep the current behaviour if we were to throw an 
exception from the compileRequest method.
+   * The only exception is that a BrokerResponse is returned for a 
literal-only query.
+   */
+  private static class CompileResult {
+    final PinotQuery _pinotQuery;
+    final PinotQuery _serverPinotQuery;
+    final Schema _schema;
+    final String _tableName;
+    final String _rawTableName;
+    final BrokerResponse _errorOrLiteralOnlyBrokerResponse;
+
+    public CompileResult(PinotQuery pinotQuery, PinotQuery serverPinotQuery, 
Schema schema, String tableName,
+        String rawTableName) {
+      _pinotQuery = pinotQuery;
+      _serverPinotQuery = serverPinotQuery;
+      _schema = schema;
+      _tableName = tableName;
+      _rawTableName = rawTableName;
+      _errorOrLiteralOnlyBrokerResponse = null;
+    }
+
+    public CompileResult(BrokerResponse errorOrLiteralOnlyBrokerResponse) {
+      _pinotQuery = null;
+      _serverPinotQuery = null;
+      _schema = null;
+      _tableName = null;
+      _rawTableName = null;
+      _errorOrLiteralOnlyBrokerResponse = errorOrLiteralOnlyBrokerResponse;
+    }
+  }
+
   protected BrokerResponse doHandleRequest(long requestId, String query, 
SqlNodeAndOptions sqlNodeAndOptions,
       JsonNode request, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext,
       @Nullable HttpHeaders httpHeaders, AccessControl accessControl)
       throws Exception {
     // Compile the request into PinotQuery
     long compilationStartTimeNs = System.nanoTime();
-    PinotQuery pinotQuery;
-    try {
-      pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
-      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
-      requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
-      // Check if the query is a v2 supported query
-      String database = 
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), 
httpHeaders);
-      if (ParserUtils.canCompileWithMultiStageEngine(query, database, 
_tableCache)) {
-        return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 new Exception(
-            "It seems that the query is only supported by the multi-stage 
query engine, please retry the query using "
-                + "the multi-stage query engine "
-                + 
"(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)")));
-      } else {
-        return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
-      }
-    }
-
-    if (isDefaultQueryResponseLimitEnabled() && !pinotQuery.isSetLimit()) {
-      pinotQuery.setLimit(_defaultQueryLimit);
-    }
-
-    if (isLiteralOnlyQuery(pinotQuery)) {
-      LOGGER.debug("Request {} contains only Literal, skipping server query: 
{}", requestId, query);
-      try {
-        if (pinotQuery.isExplain()) {
-          // EXPLAIN PLAN results to show that query is evaluated exclusively 
by Broker.
-          return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
-        }
-        return processLiteralOnlyQuery(requestId, pinotQuery, requestContext);
-      } catch (Exception e) {
-        // TODO: refine the exceptions here to early termination the queries 
won't requires to send to servers.
-        LOGGER.warn("Unable to execute literal request {}: {} at broker, 
fallback to server query. {}", requestId,
-            query, e.getMessage());
-      }
-    }
-
-    PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
-    DataSource dataSource = serverPinotQuery.getDataSource();
-    if (dataSource == null) {
-      LOGGER.info("Data source (FROM clause) not found in request {}: {}", 
requestId, query);
-      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
-      return new BrokerResponseNative(
-          QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Data source (FROM clause) not found"));
-    }
-    if (dataSource.getJoin() != null) {
-      LOGGER.info("JOIN is not supported in request {}: {}", requestId, query);
-      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
-      return new BrokerResponseNative(
-          QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"JOIN is not supported"));
-    }
-    if (dataSource.getTableName() == null) {
-      LOGGER.info("Table name not found in request {}: {}", requestId, query);
-      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
-      return new BrokerResponseNative(
-          QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Table name not found"));
-    }
-
-    try {
-      handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, 
requestContext, httpHeaders,
-          accessControl);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while handling the subquery in request {}: 
{}, {}", requestId, query,
-          e.getMessage());
-      requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
-    }
-
-    boolean ignoreCase = _tableCache.isIgnoreCase();
-    String tableName;
-    try {
-      tableName =
-          
getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), 
httpHeaders, ignoreCase),
-              _tableCache);
-    } catch (DatabaseConflictException e) {
-      LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query);
-      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
-      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
-      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
 e));
-    }
-    dataSource.setTableName(tableName);
-    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-    requestContext.setTableName(rawTableName);
-
-    AuthorizationResult authorizationResult =
-        accessControl.authorize(httpHeaders, TargetType.TABLE, tableName, 
Actions.Table.QUERY);
-
-    if (!authorizationResult.hasAccess()) {
-      throwAccessDeniedError(requestId, query, requestContext, tableName, 
authorizationResult);
-    }
-
-    try {
-      Map<String, String> columnNameMap = 
_tableCache.getColumnNameMap(rawTableName);
-      if (columnNameMap != null) {
-        updateColumnNames(rawTableName, serverPinotQuery, ignoreCase, 
columnNameMap);
-      }
-    } catch (Exception e) {
-      // Throw exceptions with column in-existence error.
-      if (e instanceof BadQueryRequestException) {
-        LOGGER.info("Caught exception while checking column names in request 
{}: {}, {}", requestId, query,
-            e.getMessage());
-        requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
-        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
-        return new 
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
 e));
-      }
-      LOGGER.warn("Caught exception while updating column names in request {}: 
{}, {}", requestId, query,
-          e.getMessage());
-    }
-
-    if (_defaultHllLog2m > 0) {
-      handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
-    }
-    if (_enableQueryLimitOverride) {
-      handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
-    }
-    handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
-        getSegmentPartitionedColumns(_tableCache, tableName));
-    if (_enableDistinctCountBitmapOverride) {
-      handleDistinctCountBitmapOverride(serverPinotQuery);
-    }
-
-    Schema schema = _tableCache.getSchema(rawTableName);
-    if (schema != null) {
-      handleDistinctMultiValuedOverride(serverPinotQuery, schema);
-    }
-
+    CompileResult compileResult =
+          compileRequest(requestId, query, sqlNodeAndOptions, request, 
requesterIdentity, requestContext, httpHeaders,
+              accessControl);
+
+    if (compileResult._errorOrLiteralOnlyBrokerResponse != null) {
+      /*
+       * If the compileRequest method sets the BrokerResponse field, then it 
is either an error response or
+       * a literal-only query. In either case, we can return the response 
directly.
+       */
+      return compileResult._errorOrLiteralOnlyBrokerResponse;
+    }
+
+    Schema schema = compileResult._schema;
+    String tableName = compileResult._tableName;
+    String rawTableName = compileResult._rawTableName;
+    PinotQuery pinotQuery = compileResult._pinotQuery;
+    PinotQuery serverPinotQuery = compileResult._serverPinotQuery;
     long compilationEndTimeNs = System.nanoTime();
     // full request compile time = compilationTimeNs + parserTimeNs
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
@@ -454,7 +379,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
     BrokerRequest serverBrokerRequest =
         serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
-    authorizationResult = accessControl.authorize(requesterIdentity, 
serverBrokerRequest);
+    AuthorizationResult authorizationResult = 
accessControl.authorize(requesterIdentity, serverBrokerRequest);
 
     _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
         System.nanoTime() - compilationEndTimeNs);
@@ -909,6 +834,145 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     return brokerResponse;
   }
 
+  private CompileResult compileRequest(long requestId, String query, 
SqlNodeAndOptions sqlNodeAndOptions,
+      JsonNode request, @Nullable RequesterIdentity requesterIdentity, 
RequestContext requestContext,
+      @Nullable HttpHeaders httpHeaders, AccessControl accessControl) {
+    PinotQuery pinotQuery;
+    try {
+      pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
+    } catch (Exception e) {
+      LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
+      requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+      // Check if the query is a v2 supported query
+      String database = 
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), 
httpHeaders);
+      if (ParserUtils.canCompileWithMultiStageEngine(query, database, 
_tableCache)) {
+        return new CompileResult(new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
+            new Exception(
+                "It seems that the query is only supported by the multi-stage 
query engine, please retry the query "
+                    + "using "
+                    + "the multi-stage query engine "
+                    + 
"(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"))));
+      } else {
+        return new CompileResult(
+            new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e)));
+      }
+    }
+
+    if (isDefaultQueryResponseLimitEnabled() && !pinotQuery.isSetLimit()) {
+      pinotQuery.setLimit(_defaultQueryLimit);
+    }
+
+    if (isLiteralOnlyQuery(pinotQuery)) {
+      LOGGER.debug("Request {} contains only Literal, skipping server query: 
{}", requestId, query);
+      try {
+        if (pinotQuery.isExplain()) {
+          // EXPLAIN PLAN results to show that query is evaluated exclusively 
by Broker.
+          return new 
CompileResult(BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT);
+        }
+        return new CompileResult(processLiteralOnlyQuery(requestId, 
pinotQuery, requestContext));
+      } catch (Exception e) {
+        // TODO: refine the exceptions here to early termination the queries 
won't requires to send to servers.
+        LOGGER.warn("Unable to execute literal request {}: {} at broker, 
fallback to server query. {}", requestId,
+            query, e.getMessage());
+      }
+    }
+
+    PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
+    DataSource dataSource = serverPinotQuery.getDataSource();
+    if (dataSource == null) {
+      LOGGER.info("Data source (FROM clause) not found in request {}: {}", 
requestId, query);
+      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+      return new CompileResult(new BrokerResponseNative(
+          QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Data source (FROM clause) not found")));
+    }
+    if (dataSource.getJoin() != null) {
+      LOGGER.info("JOIN is not supported in request {}: {}", requestId, query);
+      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+      return new CompileResult(new BrokerResponseNative(
+          QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"JOIN is not supported")));
+    }
+    if (dataSource.getTableName() == null) {
+      LOGGER.info("Table name not found in request {}: {}", requestId, query);
+      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+      return new CompileResult(new BrokerResponseNative(
+          QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Table name not found")));
+    }
+
+    try {
+      handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, 
requestContext, httpHeaders,
+          accessControl);
+    } catch (Exception e) {
+      LOGGER.info("Caught exception while handling the subquery in request {}: 
{}, {}", requestId, query,
+          e.getMessage());
+      requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+      return new CompileResult(
+          new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e)));
+    }
+
+    boolean ignoreCase = _tableCache.isIgnoreCase();
+    String tableName;
+    try {
+      tableName =
+          
getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), 
httpHeaders, ignoreCase),
+              _tableCache);
+    } catch (DatabaseConflictException e) {
+      LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 
1);
+      requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+      return new CompileResult(
+          new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
 e)));
+    }
+    dataSource.setTableName(tableName);
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    requestContext.setTableName(rawTableName);
+
+    AuthorizationResult authorizationResult =
+        accessControl.authorize(httpHeaders, TargetType.TABLE, tableName, 
Actions.Table.QUERY);
+
+    if (!authorizationResult.hasAccess()) {
+      throwAccessDeniedError(requestId, query, requestContext, tableName, 
authorizationResult);
+    }
+
+    try {
+      Map<String, String> columnNameMap = 
_tableCache.getColumnNameMap(rawTableName);
+      if (columnNameMap != null) {
+        updateColumnNames(rawTableName, serverPinotQuery, ignoreCase, 
columnNameMap);
+      }
+    } catch (Exception e) {
+      // Throw exceptions with column in-existence error.
+      if (e instanceof BadQueryRequestException) {
+        LOGGER.info("Caught exception while checking column names in request 
{}: {}, {}", requestId, query,
+            e.getMessage());
+        requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
+        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
+        return new CompileResult(
+            new 
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
 e)));
+      }
+      LOGGER.warn("Caught exception while updating column names in request {}: 
{}, {}", requestId, query,
+          e.getMessage());
+    }
+
+    if (_defaultHllLog2m > 0) {
+      handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
+    }
+    if (_enableQueryLimitOverride) {
+      handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
+    }
+    handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
+        getSegmentPartitionedColumns(_tableCache, tableName));
+    if (_enableDistinctCountBitmapOverride) {
+      handleDistinctCountBitmapOverride(serverPinotQuery);
+    }
+
+    Schema schema = _tableCache.getSchema(rawTableName);
+    if (schema != null) {
+      handleDistinctMultiValuedOverride(serverPinotQuery, schema);
+    }
+
+    return new CompileResult(pinotQuery, serverPinotQuery, schema, tableName, 
rawTableName);
+  }
+
   private void throwAccessDeniedError(long requestId, String query, 
RequestContext requestContext, String tableName,
       AuthorizationResult authorizationResult) {
     _brokerMetrics.addMeteredTableValue(tableName, 
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to