This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8563e22716 Extract compileRequest from BaseSingleStageRequestHandler.doHandleRequest (#15073) 8563e22716 is described below commit 8563e227168f96de8b0f5fe1fb77529d6725baa3 Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Fri Mar 7 11:32:13 2025 +0530 Extract compileRequest from BaseSingleStageRequestHandler.doHandleRequest (#15073) --- .../BaseSingleStageBrokerRequestHandler.java | 320 ++++++++++++--------- 1 file changed, 192 insertions(+), 128 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index a1cfa18d51..3320636a8a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -311,139 +311,64 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ } } + /** + * CompileResult holds the result of the compilation phase. Compilation may or may not be successful. If compilation + * is successful then all member variables other than BrokerResponse will be available. If compilation is not + * successful, then only the BrokerResponse is set. This is done to keep the current behaviour as is. + * It became hard to keep the current behaviour if we were to throw an exception from the compileRequest method. + * The only exception is that a BrokerResponse is returned for a literal-only query. + */ + private static class CompileResult { + final PinotQuery _pinotQuery; + final PinotQuery _serverPinotQuery; + final Schema _schema; + final String _tableName; + final String _rawTableName; + final BrokerResponse _errorOrLiteralOnlyBrokerResponse; + + public CompileResult(PinotQuery pinotQuery, PinotQuery serverPinotQuery, Schema schema, String tableName, + String rawTableName) { + _pinotQuery = pinotQuery; + _serverPinotQuery = serverPinotQuery; + _schema = schema; + _tableName = tableName; + _rawTableName = rawTableName; + _errorOrLiteralOnlyBrokerResponse = null; + } + + public CompileResult(BrokerResponse errorOrLiteralOnlyBrokerResponse) { + _pinotQuery = null; + _serverPinotQuery = null; + _schema = null; + _tableName = null; + _rawTableName = null; + _errorOrLiteralOnlyBrokerResponse = errorOrLiteralOnlyBrokerResponse; + } + } + protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders, AccessControl accessControl) throws Exception { // Compile the request into PinotQuery long compilationStartTimeNs = System.nanoTime(); - PinotQuery pinotQuery; - try { - pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); - } catch (Exception e) { - LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); - _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); - requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); - // Check if the query is a v2 supported query - String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); - if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) { - return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, new Exception( - "It seems that the query is only supported by the multi-stage query engine, please retry the query using " - + "the multi-stage query engine " - + "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)"))); - } else { - return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); - } - } - - if (isDefaultQueryResponseLimitEnabled() && !pinotQuery.isSetLimit()) { - pinotQuery.setLimit(_defaultQueryLimit); - } - - if (isLiteralOnlyQuery(pinotQuery)) { - LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query); - try { - if (pinotQuery.isExplain()) { - // EXPLAIN PLAN results to show that query is evaluated exclusively by Broker. - return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; - } - return processLiteralOnlyQuery(requestId, pinotQuery, requestContext); - } catch (Exception e) { - // TODO: refine the exceptions here to early termination the queries won't requires to send to servers. - LOGGER.warn("Unable to execute literal request {}: {} at broker, fallback to server query. {}", requestId, - query, e.getMessage()); - } - } - - PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery); - DataSource dataSource = serverPinotQuery.getDataSource(); - if (dataSource == null) { - LOGGER.info("Data source (FROM clause) not found in request {}: {}", requestId, query); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - return new BrokerResponseNative( - QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "Data source (FROM clause) not found")); - } - if (dataSource.getJoin() != null) { - LOGGER.info("JOIN is not supported in request {}: {}", requestId, query); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - return new BrokerResponseNative( - QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "JOIN is not supported")); - } - if (dataSource.getTableName() == null) { - LOGGER.info("Table name not found in request {}: {}", requestId, query); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - return new BrokerResponseNative( - QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "Table name not found")); - } - - try { - handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, requestContext, httpHeaders, - accessControl); - } catch (Exception e) { - LOGGER.info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, - e.getMessage()); - requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE); - return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); - } - - boolean ignoreCase = _tableCache.isIgnoreCase(); - String tableName; - try { - tableName = - getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), httpHeaders, ignoreCase), - _tableCache); - } catch (DatabaseConflictException e) { - LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query); - _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); - } - dataSource.setTableName(tableName); - String rawTableName = TableNameBuilder.extractRawTableName(tableName); - requestContext.setTableName(rawTableName); - - AuthorizationResult authorizationResult = - accessControl.authorize(httpHeaders, TargetType.TABLE, tableName, Actions.Table.QUERY); - - if (!authorizationResult.hasAccess()) { - throwAccessDeniedError(requestId, query, requestContext, tableName, authorizationResult); - } - - try { - Map<String, String> columnNameMap = _tableCache.getColumnNameMap(rawTableName); - if (columnNameMap != null) { - updateColumnNames(rawTableName, serverPinotQuery, ignoreCase, columnNameMap); - } - } catch (Exception e) { - // Throw exceptions with column in-existence error. - if (e instanceof BadQueryRequestException) { - LOGGER.info("Caught exception while checking column names in request {}: {}, {}", requestId, query, - e.getMessage()); - requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1); - return new BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR, e)); - } - LOGGER.warn("Caught exception while updating column names in request {}: {}, {}", requestId, query, - e.getMessage()); - } - - if (_defaultHllLog2m > 0) { - handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m); - } - if (_enableQueryLimitOverride) { - handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit); - } - handleSegmentPartitionedDistinctCountOverride(serverPinotQuery, - getSegmentPartitionedColumns(_tableCache, tableName)); - if (_enableDistinctCountBitmapOverride) { - handleDistinctCountBitmapOverride(serverPinotQuery); - } - - Schema schema = _tableCache.getSchema(rawTableName); - if (schema != null) { - handleDistinctMultiValuedOverride(serverPinotQuery, schema); - } - + CompileResult compileResult = + compileRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders, + accessControl); + + if (compileResult._errorOrLiteralOnlyBrokerResponse != null) { + /* + * If the compileRequest method sets the BrokerResponse field, then it is either an error response or + * a literal-only query. In either case, we can return the response directly. + */ + return compileResult._errorOrLiteralOnlyBrokerResponse; + } + + Schema schema = compileResult._schema; + String tableName = compileResult._tableName; + String rawTableName = compileResult._rawTableName; + PinotQuery pinotQuery = compileResult._pinotQuery; + PinotQuery serverPinotQuery = compileResult._serverPinotQuery; long compilationEndTimeNs = System.nanoTime(); // full request compile time = compilationTimeNs + parserTimeNs _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION, @@ -454,7 +379,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery); BrokerRequest serverBrokerRequest = serverPinotQuery == pinotQuery ? brokerRequest : CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery); - authorizationResult = accessControl.authorize(requesterIdentity, serverBrokerRequest); + AuthorizationResult authorizationResult = accessControl.authorize(requesterIdentity, serverBrokerRequest); _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - compilationEndTimeNs); @@ -909,6 +834,145 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ return brokerResponse; } + private CompileResult compileRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) { + PinotQuery pinotQuery; + try { + pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); + } catch (Exception e) { + LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); + requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); + // Check if the query is a v2 supported query + String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); + if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) { + return new CompileResult(new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, + new Exception( + "It seems that the query is only supported by the multi-stage query engine, please retry the query " + + "using " + + "the multi-stage query engine " + + "(https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)")))); + } else { + return new CompileResult( + new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e))); + } + } + + if (isDefaultQueryResponseLimitEnabled() && !pinotQuery.isSetLimit()) { + pinotQuery.setLimit(_defaultQueryLimit); + } + + if (isLiteralOnlyQuery(pinotQuery)) { + LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query); + try { + if (pinotQuery.isExplain()) { + // EXPLAIN PLAN results to show that query is evaluated exclusively by Broker. + return new CompileResult(BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT); + } + return new CompileResult(processLiteralOnlyQuery(requestId, pinotQuery, requestContext)); + } catch (Exception e) { + // TODO: refine the exceptions here to early termination the queries won't requires to send to servers. + LOGGER.warn("Unable to execute literal request {}: {} at broker, fallback to server query. {}", requestId, + query, e.getMessage()); + } + } + + PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery); + DataSource dataSource = serverPinotQuery.getDataSource(); + if (dataSource == null) { + LOGGER.info("Data source (FROM clause) not found in request {}: {}", requestId, query); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + return new CompileResult(new BrokerResponseNative( + QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "Data source (FROM clause) not found"))); + } + if (dataSource.getJoin() != null) { + LOGGER.info("JOIN is not supported in request {}: {}", requestId, query); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + return new CompileResult(new BrokerResponseNative( + QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "JOIN is not supported"))); + } + if (dataSource.getTableName() == null) { + LOGGER.info("Table name not found in request {}: {}", requestId, query); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + return new CompileResult(new BrokerResponseNative( + QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, "Table name not found"))); + } + + try { + handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, requestContext, httpHeaders, + accessControl); + } catch (Exception e) { + LOGGER.info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, + e.getMessage()); + requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE); + return new CompileResult( + new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e))); + } + + boolean ignoreCase = _tableCache.isIgnoreCase(); + String tableName; + try { + tableName = + getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), httpHeaders, ignoreCase), + _tableCache); + } catch (DatabaseConflictException e) { + LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + return new CompileResult( + new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e))); + } + dataSource.setTableName(tableName); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + requestContext.setTableName(rawTableName); + + AuthorizationResult authorizationResult = + accessControl.authorize(httpHeaders, TargetType.TABLE, tableName, Actions.Table.QUERY); + + if (!authorizationResult.hasAccess()) { + throwAccessDeniedError(requestId, query, requestContext, tableName, authorizationResult); + } + + try { + Map<String, String> columnNameMap = _tableCache.getColumnNameMap(rawTableName); + if (columnNameMap != null) { + updateColumnNames(rawTableName, serverPinotQuery, ignoreCase, columnNameMap); + } + } catch (Exception e) { + // Throw exceptions with column in-existence error. + if (e instanceof BadQueryRequestException) { + LOGGER.info("Caught exception while checking column names in request {}: {}, {}", requestId, query, + e.getMessage()); + requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1); + return new CompileResult( + new BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR, e))); + } + LOGGER.warn("Caught exception while updating column names in request {}: {}, {}", requestId, query, + e.getMessage()); + } + + if (_defaultHllLog2m > 0) { + handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m); + } + if (_enableQueryLimitOverride) { + handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit); + } + handleSegmentPartitionedDistinctCountOverride(serverPinotQuery, + getSegmentPartitionedColumns(_tableCache, tableName)); + if (_enableDistinctCountBitmapOverride) { + handleDistinctCountBitmapOverride(serverPinotQuery); + } + + Schema schema = _tableCache.getSchema(rawTableName); + if (schema != null) { + handleDistinctMultiValuedOverride(serverPinotQuery, schema); + } + + return new CompileResult(pinotQuery, serverPinotQuery, schema, tableName, rawTableName); + } + private void throwAccessDeniedError(long requestId, String query, RequestContext requestContext, String tableName, AuthorizationResult authorizationResult) { _brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org