vrajat commented on code in PR #15240:
URL: https://github.com/apache/pinot/pull/15240#discussion_r1988631722


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -317,4 +323,66 @@ protected void onQueryFinish(long requestId) {
   protected boolean isQueryCancellationEnabled() {
     return _queriesById != null;
   }
+
+  protected void updatePhaseTimingForTables(Set<String> tableNames, 
BrokerQueryPhase phase, long time) {
+    for (String tableName : tableNames) {
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      _brokerMetrics.addPhaseTiming(rawTableName, phase, time);
+    }
+  }
+
+  /**
+   * Validates whether the requester has access to all the tables.
+   */
+  protected TableAuthorizationResult hasTableAccess(RequesterIdentity 
requesterIdentity, Set<String> tableNames,
+      RequestContext requestContext, HttpHeaders httpHeaders) {
+    final long startTimeNs = System.nanoTime();
+    AccessControl accessControl = _accessControlFactory.create();
+
+    TableAuthorizationResult tableAuthorizationResult = 
accessControl.authorize(requesterIdentity, tableNames);
+
+    Set<String> failedTables = tableNames.stream()
+        .filter(table -> !accessControl.hasAccess(httpHeaders, 
TargetType.TABLE, table, Actions.Table.QUERY))
+        .collect(Collectors.toSet());
+
+    failedTables.addAll(tableAuthorizationResult.getFailedTables());
+
+    if (!failedTables.isEmpty()) {
+      tableAuthorizationResult = new TableAuthorizationResult(failedTables);
+    } else {
+      tableAuthorizationResult = TableAuthorizationResult.success();
+    }
+
+    if (!tableAuthorizationResult.hasAccess()) {
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
 1);
+      LOGGER.warn("Access denied for requestId {}", 
requestContext.getRequestId());

Review Comment:
   This code is copied as is from `MultiStageBrokerRequestHandler`. I can 
remove it if required.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -317,4 +323,66 @@ protected void onQueryFinish(long requestId) {
   protected boolean isQueryCancellationEnabled() {
     return _queriesById != null;
   }
+
+  protected void updatePhaseTimingForTables(Set<String> tableNames, 
BrokerQueryPhase phase, long time) {
+    for (String tableName : tableNames) {
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      _brokerMetrics.addPhaseTiming(rawTableName, phase, time);
+    }
+  }
+
+  /**
+   * Validates whether the requester has access to all the tables.
+   */
+  protected TableAuthorizationResult hasTableAccess(RequesterIdentity 
requesterIdentity, Set<String> tableNames,
+      RequestContext requestContext, HttpHeaders httpHeaders) {
+    final long startTimeNs = System.nanoTime();
+    AccessControl accessControl = _accessControlFactory.create();
+
+    TableAuthorizationResult tableAuthorizationResult = 
accessControl.authorize(requesterIdentity, tableNames);
+
+    Set<String> failedTables = tableNames.stream()
+        .filter(table -> !accessControl.hasAccess(httpHeaders, 
TargetType.TABLE, table, Actions.Table.QUERY))
+        .collect(Collectors.toSet());
+
+    failedTables.addAll(tableAuthorizationResult.getFailedTables());
+
+    if (!failedTables.isEmpty()) {
+      tableAuthorizationResult = new TableAuthorizationResult(failedTables);
+    } else {
+      tableAuthorizationResult = TableAuthorizationResult.success();
+    }
+
+    if (!tableAuthorizationResult.hasAccess()) {
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
 1);
+      LOGGER.warn("Access denied for requestId {}", 
requestContext.getRequestId());
+      requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+    }
+
+    updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, 
System.nanoTime() - startTimeNs);
+
+    return tableAuthorizationResult;
+  }
+
+  /**
+   * Returns true if the QPS quota of query tables, database or application 
has been exceeded.
+   */
+  protected boolean hasExceededQPSQuota(@Nullable String database, Set<String> 
tableNames,
+      RequestContext requestContext) {
+    if (database != null && !_queryQuotaManager.acquireDatabase(database)) {
+      LOGGER.warn("Request {}: query exceeds quota for database: {}", 
requestContext.getRequestId(), database);
+      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
+      return true;
+    }
+    for (String tableName : tableNames) {
+      if (!_queryQuotaManager.acquire(tableName)) {
+        LOGGER.warn("Request {}: query exceeds quota for table: {}", 
requestContext.getRequestId(), tableName);
+        requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
+        String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);

Review Comment:
   This code is copied as is from `MultiStageBrokerRequestHandler`. I can 
remove it if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to