vrajat commented on code in PR #15240: URL: https://github.com/apache/pinot/pull/15240#discussion_r1988631722
########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -317,4 +323,66 @@ protected void onQueryFinish(long requestId) { protected boolean isQueryCancellationEnabled() { return _queriesById != null; } + + protected void updatePhaseTimingForTables(Set<String> tableNames, BrokerQueryPhase phase, long time) { + for (String tableName : tableNames) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + _brokerMetrics.addPhaseTiming(rawTableName, phase, time); + } + } + + /** + * Validates whether the requester has access to all the tables. + */ + protected TableAuthorizationResult hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, + RequestContext requestContext, HttpHeaders httpHeaders) { + final long startTimeNs = System.nanoTime(); + AccessControl accessControl = _accessControlFactory.create(); + + TableAuthorizationResult tableAuthorizationResult = accessControl.authorize(requesterIdentity, tableNames); + + Set<String> failedTables = tableNames.stream() + .filter(table -> !accessControl.hasAccess(httpHeaders, TargetType.TABLE, table, Actions.Table.QUERY)) + .collect(Collectors.toSet()); + + failedTables.addAll(tableAuthorizationResult.getFailedTables()); + + if (!failedTables.isEmpty()) { + tableAuthorizationResult = new TableAuthorizationResult(failedTables); + } else { + tableAuthorizationResult = TableAuthorizationResult.success(); + } + + if (!tableAuthorizationResult.hasAccess()) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); + LOGGER.warn("Access denied for requestId {}", requestContext.getRequestId()); Review Comment: This code is copied as is from `MultiStageBrokerRequestHandler`. I can remove it if required. ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java: ########## @@ -317,4 +323,66 @@ protected void onQueryFinish(long requestId) { protected boolean isQueryCancellationEnabled() { return _queriesById != null; } + + protected void updatePhaseTimingForTables(Set<String> tableNames, BrokerQueryPhase phase, long time) { + for (String tableName : tableNames) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + _brokerMetrics.addPhaseTiming(rawTableName, phase, time); + } + } + + /** + * Validates whether the requester has access to all the tables. + */ + protected TableAuthorizationResult hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, + RequestContext requestContext, HttpHeaders httpHeaders) { + final long startTimeNs = System.nanoTime(); + AccessControl accessControl = _accessControlFactory.create(); + + TableAuthorizationResult tableAuthorizationResult = accessControl.authorize(requesterIdentity, tableNames); + + Set<String> failedTables = tableNames.stream() + .filter(table -> !accessControl.hasAccess(httpHeaders, TargetType.TABLE, table, Actions.Table.QUERY)) + .collect(Collectors.toSet()); + + failedTables.addAll(tableAuthorizationResult.getFailedTables()); + + if (!failedTables.isEmpty()) { + tableAuthorizationResult = new TableAuthorizationResult(failedTables); + } else { + tableAuthorizationResult = TableAuthorizationResult.success(); + } + + if (!tableAuthorizationResult.hasAccess()) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); + LOGGER.warn("Access denied for requestId {}", requestContext.getRequestId()); + requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); + } + + updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - startTimeNs); + + return tableAuthorizationResult; + } + + /** + * Returns true if the QPS quota of query tables, database or application has been exceeded. + */ + protected boolean hasExceededQPSQuota(@Nullable String database, Set<String> tableNames, + RequestContext requestContext) { + if (database != null && !_queryQuotaManager.acquireDatabase(database)) { + LOGGER.warn("Request {}: query exceeds quota for database: {}", requestContext.getRequestId(), database); + requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); + return true; + } + for (String tableName : tableNames) { + if (!_queryQuotaManager.acquire(tableName)) { + LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName); + requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); Review Comment: This code is copied as is from `MultiStageBrokerRequestHandler`. I can remove it if required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org