This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e49026c8d0 Consolidate Auth and Validation in SSE Request Handler (#15240) e49026c8d0 is described below commit e49026c8d0dc768e00a58bea4e49a6a80ce262b0 Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Thu Mar 13 19:26:49 2025 +0530 Consolidate Auth and Validation in SSE Request Handler (#15240) In BaseSingleStageBrokerRequestHandler, authorization steps were in two different places. Now the checks are consolidated in a single call. One caveat is that for schema related compilation errors, the user should have read permissions else schema info is leaked. QPS validation checks have also been moved up as soon as table name is available. Functions for MSE have been reused as they are reusable and also with an eye on #10712. In #10712, SSE also has to support multiple physical tables. --- .../requesthandler/BaseBrokerRequestHandler.java | 68 ++++++++++++++++++ .../BaseSingleStageBrokerRequestHandler.java | 82 +++++++++------------- .../MultiStageBrokerRequestHandler.java | 65 ----------------- 3 files changed, 101 insertions(+), 114 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 3351fbf0c5..7268e9092d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import javax.ws.rs.WebApplicationException; @@ -46,11 +47,15 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.metrics.BrokerQueryPhase; import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.auth.Actions; +import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.spi.auth.AuthorizationResult; +import org.apache.pinot.spi.auth.TableAuthorizationResult; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener; import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory; @@ -59,6 +64,7 @@ import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -317,4 +323,66 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected boolean isQueryCancellationEnabled() { return _queriesById != null; } + + protected void updatePhaseTimingForTables(Set<String> tableNames, BrokerQueryPhase phase, long time) { + for (String tableName : tableNames) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + _brokerMetrics.addPhaseTiming(rawTableName, phase, time); + } + } + + /** + * Validates whether the requester has access to all the tables. + */ + protected TableAuthorizationResult hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, + RequestContext requestContext, HttpHeaders httpHeaders) { + final long startTimeNs = System.nanoTime(); + AccessControl accessControl = _accessControlFactory.create(); + + TableAuthorizationResult tableAuthorizationResult = accessControl.authorize(requesterIdentity, tableNames); + + Set<String> failedTables = tableNames.stream() + .filter(table -> !accessControl.hasAccess(httpHeaders, TargetType.TABLE, table, Actions.Table.QUERY)) + .collect(Collectors.toSet()); + + failedTables.addAll(tableAuthorizationResult.getFailedTables()); + + if (!failedTables.isEmpty()) { + tableAuthorizationResult = new TableAuthorizationResult(failedTables); + } else { + tableAuthorizationResult = TableAuthorizationResult.success(); + } + + if (!tableAuthorizationResult.hasAccess()) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); + LOGGER.warn("Access denied for requestId {}", requestContext.getRequestId()); + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + } + + updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - startTimeNs); + + return tableAuthorizationResult; + } + + /** + * Returns true if the QPS quota of query tables, database or application has been exceeded. + */ + protected boolean hasExceededQPSQuota(@Nullable String database, Set<String> tableNames, + RequestContext requestContext) { + if (database != null && !_queryQuotaManager.acquireDatabase(database)) { + LOGGER.warn("Request {}: query exceeds quota for database: {}", requestContext.getRequestId(), database); + requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); + return true; + } + for (String tableName : tableNames) { + if (!_queryQuotaManager.acquire(tableName)) { + LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName); + requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); + return true; + } + } + return false; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 7fc23b7fd2..926059892d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -79,8 +79,6 @@ import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.request.RequestUtils; -import org.apache.pinot.core.auth.Actions; -import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.core.routing.ServerRouteInfo; @@ -368,25 +366,39 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ String rawTableName = compileResult._rawTableName; PinotQuery pinotQuery = compileResult._pinotQuery; PinotQuery serverPinotQuery = compileResult._serverPinotQuery; + String database = DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName); + long compilationEndTimeNs = System.nanoTime(); // full request compile time = compilationTimeNs + parserTimeNs _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION, (compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs()); - // Second-stage table-level access control - // TODO: Modify AccessControl interface to directly take PinotQuery - BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery); - BrokerRequest serverBrokerRequest = - serverPinotQuery == pinotQuery ? brokerRequest : CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery); - AuthorizationResult authorizationResult = accessControl.authorize(requesterIdentity, serverBrokerRequest); - - _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION, - System.nanoTime() - compilationEndTimeNs); - + AuthorizationResult authorizationResult = + hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, httpHeaders); if (!authorizationResult.hasAccess()) { throwAccessDeniedError(requestId, query, requestContext, tableName, authorizationResult); } + // Validate QPS + if (hasExceededQPSQuota(database, Set.of(tableName), requestContext)) { + String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query); + return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); + } + + // Validate the request + try { + validateRequest(serverPinotQuery, _queryResponseLimit); + } catch (Exception e) { + LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage()); + requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); + } + + BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery); + BrokerRequest serverBrokerRequest = + serverPinotQuery == pinotQuery ? brokerRequest : CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery); + // Get the tables hit by the request String offlineTableName = null; String realtimeTableName = null; @@ -445,34 +457,6 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ handleApproximateFunctionOverride(serverPinotQuery); } - // Validate QPS quota - String database = DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName); - if (!_queryQuotaManager.acquireDatabase(database)) { - String errorMessage = - String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database); - LOGGER.info(errorMessage); - requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); - return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); - } - if (!_queryQuotaManager.acquire(tableName)) { - String errorMessage = - String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName); - LOGGER.info(errorMessage); - requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); - return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); - } - - // Validate the request - try { - validateRequest(serverPinotQuery, _queryResponseLimit); - } catch (Exception e) { - LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage()); - requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); - return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); - } - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1); _brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.REQUEST_SIZE, query.length()); @@ -923,13 +907,6 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ String rawTableName = TableNameBuilder.extractRawTableName(tableName); requestContext.setTableName(rawTableName); - AuthorizationResult authorizationResult = - accessControl.authorize(httpHeaders, TargetType.TABLE, tableName, Actions.Table.QUERY); - - if (!authorizationResult.hasAccess()) { - throwAccessDeniedError(requestId, query, requestContext, tableName, authorizationResult); - } - try { Map<String, String> columnNameMap = _tableCache.getColumnNameMap(rawTableName); if (columnNameMap != null) { @@ -938,12 +915,19 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ } catch (Exception e) { // Throw exceptions with column in-existence error. if (e instanceof BadQueryRequestException) { + if (tableName != null) { + // First check if table permissions are in place to not leak schema information. + AuthorizationResult authorizationResult = + hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, httpHeaders); + if (!authorizationResult.hasAccess()) { + throwAccessDeniedError(requestId, query, requestContext, tableName, authorizationResult); + } + } LOGGER.info("Caught exception while checking column names in request {}: {}, {}", requestId, query, e.getMessage()); requestContext.setErrorCode(QueryErrorCode.UNKNOWN_COLUMN); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1); - return new CompileResult( - new BrokerResponseNative(QueryErrorCode.UNKNOWN_COLUMN, e.getMessage())); + return new CompileResult(new BrokerResponseNative(QueryErrorCode.UNKNOWN_COLUMN, e.getMessage())); } LOGGER.warn("Caught exception while updating column names in request {}: {}, {}", requestId, query, e.getMessage()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index a28dd5f68e..9ca2953dda 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -66,8 +66,6 @@ import org.apache.pinot.common.utils.NamedThreadFactory; import org.apache.pinot.common.utils.Timer; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.tls.TlsUtils; -import org.apache.pinot.core.auth.Actions; -import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.mailbox.MailboxService; @@ -89,7 +87,6 @@ import org.apache.pinot.spi.exception.QueryException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -464,68 +461,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } } - /** - * Validates whether the requester has access to all the tables. - */ - private TableAuthorizationResult hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, - RequestContext requestContext, HttpHeaders httpHeaders) { - final long startTimeNs = System.nanoTime(); - AccessControl accessControl = _accessControlFactory.create(); - - TableAuthorizationResult tableAuthorizationResult = accessControl.authorize(requesterIdentity, tableNames); - - Set<String> failedTables = tableNames.stream() - .filter(table -> !accessControl.hasAccess(httpHeaders, TargetType.TABLE, table, Actions.Table.QUERY)) - .collect(Collectors.toSet()); - - failedTables.addAll(tableAuthorizationResult.getFailedTables()); - - if (!failedTables.isEmpty()) { - tableAuthorizationResult = new TableAuthorizationResult(failedTables); - } else { - tableAuthorizationResult = TableAuthorizationResult.success(); - } - - if (!tableAuthorizationResult.hasAccess()) { - _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); - LOGGER.warn("Access denied for requestId {}", requestContext.getRequestId()); - requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); - } - - updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - startTimeNs); - - return tableAuthorizationResult; - } - - /** - * Returns true if the QPS quota of query tables, database or application has been exceeded. - */ - private boolean hasExceededQPSQuota(@Nullable String database, Set<String> tableNames, - RequestContext requestContext) { - if (database != null && !_queryQuotaManager.acquireDatabase(database)) { - LOGGER.warn("Request {}: query exceeds quota for database: {}", requestContext.getRequestId(), database); - requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); - return true; - } - for (String tableName : tableNames) { - if (!_queryQuotaManager.acquire(tableName)) { - LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName); - requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); - String rawTableName = TableNameBuilder.extractRawTableName(tableName); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); - return true; - } - } - return false; - } - - private void updatePhaseTimingForTables(Set<String> tableNames, BrokerQueryPhase phase, long time) { - for (String tableName : tableNames) { - String rawTableName = TableNameBuilder.extractRawTableName(tableName); - _brokerMetrics.addPhaseTiming(rawTableName, phase, time); - } - } - private BrokerResponse constructMultistageExplainPlan(String sql, String plan, Map<String, String> extraFields) { BrokerResponseNative brokerResponse = BrokerResponseNative.empty(); int totalFieldCount = extraFields.size() + 2; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org