This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e49026c8d0 Consolidate Auth and Validation in SSE Request Handler 
(#15240)
e49026c8d0 is described below

commit e49026c8d0dc768e00a58bea4e49a6a80ce262b0
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Thu Mar 13 19:26:49 2025 +0530

    Consolidate Auth and Validation in SSE Request Handler (#15240)
    
    In BaseSingleStageBrokerRequestHandler, authorization steps were in two 
different places. Now the checks are consolidated in a single call. One caveat 
is that for schema related compilation errors, the user should have read 
permissions else schema info is leaked.
    
    QPS validation checks have also been moved up as soon as table name is 
available.
    
    Functions for MSE have been reused as they are reusable and also with an 
eye on #10712. In #10712, SSE also has to support multiple physical tables.
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 68 ++++++++++++++++++
 .../BaseSingleStageBrokerRequestHandler.java       | 82 +++++++++-------------
 .../MultiStageBrokerRequestHandler.java            | 65 -----------------
 3 files changed, 101 insertions(+), 114 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 3351fbf0c5..7268e9092d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -29,6 +29,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import javax.ws.rs.WebApplicationException;
@@ -46,11 +47,15 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.BrokerQueryPhase;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.spi.auth.AuthorizationResult;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
 import 
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
@@ -59,6 +64,7 @@ import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -317,4 +323,66 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
   protected boolean isQueryCancellationEnabled() {
     return _queriesById != null;
   }
+
+  protected void updatePhaseTimingForTables(Set<String> tableNames, 
BrokerQueryPhase phase, long time) {
+    for (String tableName : tableNames) {
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      _brokerMetrics.addPhaseTiming(rawTableName, phase, time);
+    }
+  }
+
+  /**
+   * Validates whether the requester has access to all the tables.
+   */
+  protected TableAuthorizationResult hasTableAccess(RequesterIdentity 
requesterIdentity, Set<String> tableNames,
+      RequestContext requestContext, HttpHeaders httpHeaders) {
+    final long startTimeNs = System.nanoTime();
+    AccessControl accessControl = _accessControlFactory.create();
+
+    TableAuthorizationResult tableAuthorizationResult = 
accessControl.authorize(requesterIdentity, tableNames);
+
+    Set<String> failedTables = tableNames.stream()
+        .filter(table -> !accessControl.hasAccess(httpHeaders, 
TargetType.TABLE, table, Actions.Table.QUERY))
+        .collect(Collectors.toSet());
+
+    failedTables.addAll(tableAuthorizationResult.getFailedTables());
+
+    if (!failedTables.isEmpty()) {
+      tableAuthorizationResult = new TableAuthorizationResult(failedTables);
+    } else {
+      tableAuthorizationResult = TableAuthorizationResult.success();
+    }
+
+    if (!tableAuthorizationResult.hasAccess()) {
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
 1);
+      LOGGER.warn("Access denied for requestId {}", 
requestContext.getRequestId());
+      requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
+    }
+
+    updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, 
System.nanoTime() - startTimeNs);
+
+    return tableAuthorizationResult;
+  }
+
+  /**
+   * Returns true if the QPS quota of query tables, database or application 
has been exceeded.
+   */
+  protected boolean hasExceededQPSQuota(@Nullable String database, Set<String> 
tableNames,
+      RequestContext requestContext) {
+    if (database != null && !_queryQuotaManager.acquireDatabase(database)) {
+      LOGGER.warn("Request {}: query exceeds quota for database: {}", 
requestContext.getRequestId(), database);
+      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
+      return true;
+    }
+    for (String tableName : tableNames) {
+      if (!_queryQuotaManager.acquire(tableName)) {
+        LOGGER.warn("Request {}: query exceeds quota for table: {}", 
requestContext.getRequestId(), tableName);
+        requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
+        String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 7fc23b7fd2..926059892d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -79,8 +79,6 @@ import 
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.auth.Actions;
-import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.ServerRouteInfo;
@@ -368,25 +366,39 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     String rawTableName = compileResult._rawTableName;
     PinotQuery pinotQuery = compileResult._pinotQuery;
     PinotQuery serverPinotQuery = compileResult._serverPinotQuery;
+    String database = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
+
     long compilationEndTimeNs = System.nanoTime();
     // full request compile time = compilationTimeNs + parserTimeNs
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
         (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
 
-    // Second-stage table-level access control
-    // TODO: Modify AccessControl interface to directly take PinotQuery
-    BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
-    BrokerRequest serverBrokerRequest =
-        serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
-    AuthorizationResult authorizationResult = 
accessControl.authorize(requesterIdentity, serverBrokerRequest);
-
-    _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
-        System.nanoTime() - compilationEndTimeNs);
-
+    AuthorizationResult authorizationResult =
+        hasTableAccess(requesterIdentity, Set.of(tableName), requestContext, 
httpHeaders);
     if (!authorizationResult.hasAccess()) {
       throwAccessDeniedError(requestId, query, requestContext, tableName, 
authorizationResult);
     }
 
+    // Validate QPS
+    if (hasExceededQPSQuota(database, Set.of(tableName), requestContext)) {
+      String errorMessage = String.format("Request %d: %s exceeds query 
quota.", requestId, query);
+      return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
+    }
+
+    // Validate the request
+    try {
+      validateRequest(serverPinotQuery, _queryResponseLimit);
+    } catch (Exception e) {
+      LOGGER.info("Caught exception while validating request {}: {}, {}", 
requestId, query, e.getMessage());
+      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
+      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
+      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
+    }
+
+    BrokerRequest brokerRequest = 
CalciteSqlCompiler.convertToBrokerRequest(pinotQuery);
+    BrokerRequest serverBrokerRequest =
+        serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
+
     // Get the tables hit by the request
     String offlineTableName = null;
     String realtimeTableName = null;
@@ -445,34 +457,6 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       handleApproximateFunctionOverride(serverPinotQuery);
     }
 
-    // Validate QPS quota
-    String database = 
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableName);
-    if (!_queryQuotaManager.acquireDatabase(database)) {
-      String errorMessage =
-          String.format("Request %d: %s exceeds query quota for database: %s", 
requestId, query, database);
-      LOGGER.info(errorMessage);
-      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
-      return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
-    }
-    if (!_queryQuotaManager.acquire(tableName)) {
-      String errorMessage =
-          String.format("Request %d: %s exceeds query quota for table: %s", 
requestId, query, tableName);
-      LOGGER.info(errorMessage);
-      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
-      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
-      return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
-    }
-
-    // Validate the request
-    try {
-      validateRequest(serverPinotQuery, _queryResponseLimit);
-    } catch (Exception e) {
-      LOGGER.info("Caught exception while validating request {}: {}, {}", 
requestId, query, e.getMessage());
-      requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION);
-      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
-      return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, 
e.getMessage());
-    }
-
     _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1);
     _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1);
     _brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.REQUEST_SIZE, query.length());
@@ -923,13 +907,6 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     requestContext.setTableName(rawTableName);
 
-    AuthorizationResult authorizationResult =
-        accessControl.authorize(httpHeaders, TargetType.TABLE, tableName, 
Actions.Table.QUERY);
-
-    if (!authorizationResult.hasAccess()) {
-      throwAccessDeniedError(requestId, query, requestContext, tableName, 
authorizationResult);
-    }
-
     try {
       Map<String, String> columnNameMap = 
_tableCache.getColumnNameMap(rawTableName);
       if (columnNameMap != null) {
@@ -938,12 +915,19 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     } catch (Exception e) {
       // Throw exceptions with column in-existence error.
       if (e instanceof BadQueryRequestException) {
+        if (tableName != null) {
+          // First check if table permissions are in place to not leak schema 
information.
+          AuthorizationResult authorizationResult =
+              hasTableAccess(requesterIdentity, Set.of(tableName), 
requestContext, httpHeaders);
+          if (!authorizationResult.hasAccess()) {
+            throwAccessDeniedError(requestId, query, requestContext, 
tableName, authorizationResult);
+          }
+        }
         LOGGER.info("Caught exception while checking column names in request 
{}: {}, {}", requestId, query,
             e.getMessage());
         requestContext.setErrorCode(QueryErrorCode.UNKNOWN_COLUMN);
         _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
-        return new CompileResult(
-            new BrokerResponseNative(QueryErrorCode.UNKNOWN_COLUMN, 
e.getMessage()));
+        return new CompileResult(new 
BrokerResponseNative(QueryErrorCode.UNKNOWN_COLUMN, e.getMessage()));
       }
       LOGGER.warn("Caught exception while updating column names in request {}: 
{}, {}", requestId, query,
           e.getMessage());
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index a28dd5f68e..9ca2953dda 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -66,8 +66,6 @@ import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.common.utils.Timer;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.tls.TlsUtils;
-import org.apache.pinot.core.auth.Actions;
-import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -89,7 +87,6 @@ import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -464,68 +461,6 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     }
   }
 
-  /**
-   * Validates whether the requester has access to all the tables.
-   */
-  private TableAuthorizationResult hasTableAccess(RequesterIdentity 
requesterIdentity, Set<String> tableNames,
-      RequestContext requestContext, HttpHeaders httpHeaders) {
-    final long startTimeNs = System.nanoTime();
-    AccessControl accessControl = _accessControlFactory.create();
-
-    TableAuthorizationResult tableAuthorizationResult = 
accessControl.authorize(requesterIdentity, tableNames);
-
-    Set<String> failedTables = tableNames.stream()
-        .filter(table -> !accessControl.hasAccess(httpHeaders, 
TargetType.TABLE, table, Actions.Table.QUERY))
-        .collect(Collectors.toSet());
-
-    failedTables.addAll(tableAuthorizationResult.getFailedTables());
-
-    if (!failedTables.isEmpty()) {
-      tableAuthorizationResult = new TableAuthorizationResult(failedTables);
-    } else {
-      tableAuthorizationResult = TableAuthorizationResult.success();
-    }
-
-    if (!tableAuthorizationResult.hasAccess()) {
-      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
 1);
-      LOGGER.warn("Access denied for requestId {}", 
requestContext.getRequestId());
-      requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED);
-    }
-
-    updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, 
System.nanoTime() - startTimeNs);
-
-    return tableAuthorizationResult;
-  }
-
-  /**
-   * Returns true if the QPS quota of query tables, database or application 
has been exceeded.
-   */
-  private boolean hasExceededQPSQuota(@Nullable String database, Set<String> 
tableNames,
-      RequestContext requestContext) {
-    if (database != null && !_queryQuotaManager.acquireDatabase(database)) {
-      LOGGER.warn("Request {}: query exceeds quota for database: {}", 
requestContext.getRequestId(), database);
-      requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
-      return true;
-    }
-    for (String tableName : tableNames) {
-      if (!_queryQuotaManager.acquire(tableName)) {
-        LOGGER.warn("Request {}: query exceeds quota for table: {}", 
requestContext.getRequestId(), tableName);
-        requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS);
-        String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-        _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void updatePhaseTimingForTables(Set<String> tableNames, 
BrokerQueryPhase phase, long time) {
-    for (String tableName : tableNames) {
-      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-      _brokerMetrics.addPhaseTiming(rawTableName, phase, time);
-    }
-  }
-
   private BrokerResponse constructMultistageExplainPlan(String sql, String 
plan, Map<String, String> extraFields) {
     BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
     int totalFieldCount = extraFields.size() + 2;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to