This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ecbb61dfb Introduce rlsFiltersApplied in the broker response (#16208)
3ecbb61dfb is described below

commit 3ecbb61dfb201aa51e099a0668550d66d2898de6
Author: 9aman <35227405+9a...@users.noreply.github.com>
AuthorDate: Tue Jul 1 18:19:14 2025 +0530

    Introduce rlsFiltersApplied in the broker response (#16208)
---
 .../org/apache/pinot/broker/querylog/QueryLogger.java  |  6 ++++++
 .../BaseSingleStageBrokerRequestHandler.java           |  6 ++++++
 .../requesthandler/MultiStageBrokerRequestHandler.java | 18 +++++++++++++-----
 .../apache/pinot/broker/querylog/QueryLoggerTest.java  |  2 ++
 .../org/apache/pinot/common/metrics/BrokerMeter.java   |  2 ++
 .../apache/pinot/common/response/BrokerResponse.java   | 12 ++++++++++++
 .../common/response/broker/BrokerResponseNative.java   | 13 ++++++++++++-
 .../common/response/broker/BrokerResponseNativeV2.java | 13 ++++++++++++-
 8 files changed, 65 insertions(+), 7 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java 
b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
index 6cb4d55555..5edcc24570 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
@@ -334,6 +334,12 @@ public class QueryLogger {
       void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams 
params) {
           builder.append(params._response.getPools());
       }
+    },
+    RLS_FILTERS_APPLIED("rlsFiltersApplied") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams 
params) {
+        builder.append(params._response.getRLSFiltersApplied());
+      }
     };
 
     public final String _entryName;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 73c67caef1..e3184f83d0 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
@@ -404,6 +405,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
 
     TableRouteProvider routeProvider;
 
+    AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false);
     if (logicalTableConfig != null) {
       Set<String> physicalTableNames = 
logicalTableConfig.getPhysicalTableConfigMap().keySet();
       AuthorizationResult authorizationResult =
@@ -453,6 +455,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
           pinotQuery.setQueryOptions(queryOptions);
           try {
             CalciteSqlParser.queryRewrite(pinotQuery, 
RlsFiltersRewriter.class);
+            rlsFiltersApplied.set(true);
+            _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.RLS_FILTERS_APPLIED, 1);
           } catch (Exception e) {
             LOGGER.error(
                 "Unable to apply RLS filter: {}. Row-level security filtering 
will be disabled for this query.",
@@ -841,6 +845,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
           
BrokerMetrics.getTagForPreferredPool(sqlNodeAndOptions.getOptions()), 
String.valueOf(pool));
     }
 
+    brokerResponse.setRLSFiltersApplied(rlsFiltersApplied.get());
+
     // Log query and stats
     _queryLogger.log(
         new QueryLogger.QueryLogParams(requestContext, tableName, 
brokerResponse,
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index b288a8e95e..bd42a81d77 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.ws.rs.WebApplicationException;
@@ -299,13 +300,14 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     try (QueryEnvironment.CompiledQuery compiledQuery =
         compileQuery(requestId, query, sqlNodeAndOptions, httpHeaders, 
queryTimer)) {
-
-      checkAuthorization(requesterIdentity, requestContext, httpHeaders, 
compiledQuery);
+      AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false);
+      checkAuthorization(requesterIdentity, requestContext, httpHeaders, 
compiledQuery, rlsFiltersApplied);
 
       if (sqlNodeAndOptions.getSqlNode().getKind() == SqlKind.EXPLAIN) {
         return explain(compiledQuery, requestId, requestContext, queryTimer);
       } else {
-        return query(compiledQuery, requestId, requesterIdentity, 
requestContext, httpHeaders, queryTimer);
+        return query(compiledQuery, requestId, requesterIdentity, 
requestContext, httpHeaders, queryTimer,
+            rlsFiltersApplied.get());
       }
     }
   }
@@ -336,7 +338,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   }
 
   private void checkAuthorization(RequesterIdentity requesterIdentity, 
RequestContext requestContext,
-      HttpHeaders httpHeaders, QueryEnvironment.CompiledQuery compiledQuery) {
+      HttpHeaders httpHeaders, QueryEnvironment.CompiledQuery compiledQuery, 
AtomicBoolean rlsFiltersApplied) {
     Set<String> tables = compiledQuery.getTableNames();
     if (tables != null && !tables.isEmpty()) {
       TableAuthorizationResult tableAuthorizationResult =
@@ -349,10 +351,12 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         for (String tableName : tables) {
           accessControl.getRowColFilters(requesterIdentity, 
tableName).getRLSFilters()
               .ifPresent(rowFilters -> {
+                rlsFiltersApplied.set(true);
                 String combinedFilters =
                     rowFilters.stream().map(filter -> "( " + filter + " 
)").collect(Collectors.joining(" AND "));
                 String key = RlsUtils.buildRlsFilterKey(tableName);
                 compiledQuery.getOptions().put(key, combinedFilters);
+                _brokerMetrics.addMeteredTableValue(tableName, 
BrokerMeter.RLS_FILTERS_APPLIED, 1);
               });
         }
       }
@@ -444,7 +448,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   }
 
   private BrokerResponse query(QueryEnvironment.CompiledQuery query, long 
requestId,
-      RequesterIdentity requesterIdentity, RequestContext requestContext, 
HttpHeaders httpHeaders, Timer timer)
+      RequesterIdentity requesterIdentity, RequestContext requestContext, 
HttpHeaders httpHeaders, Timer timer,
+      boolean rlsFiltersApplied)
       throws QueryException, WebApplicationException {
     QueryEnvironment.QueryPlannerResult queryPlanResult = callAsync(requestId, 
query.getTextQuery(),
         () -> query.planQuery(requestId), timer);
@@ -585,6 +590,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         brokerResponse.setResultTable(null);
       }
 
+      // set if rls (row level security) filters have been applied on the query
+      brokerResponse.setRLSFiltersApplied(rlsFiltersApplied);
+
       // Log query and stats
       _queryLogger.log(
           new QueryLogger.QueryLogParams(requestContext, 
tableNames.toString(), brokerResponse,
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
index 124aade345..141ff3949e 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
@@ -116,6 +116,7 @@ public class QueryLoggerTest {
         + "offlineMemAllocatedBytes(total/thread/resSer):0/0/0,"
         + "realtimeMemAllocatedBytes(total/thread/resSer):0/0/0,"
         + "pools=[],"
+        + "rlsFiltersApplied=true,"
         + "query=SELECT * FROM foo");
     //@formatter:on
   }
@@ -278,6 +279,7 @@ public class QueryLoggerTest {
     response.setRealtimeSystemActivitiesCpuTimeNs(18);
     response.setRealtimeResponseSerializationCpuTimeNs(19);
     response.setBrokerReduceTimeMs(20);
+    response.setRLSFiltersApplied(true);
 
     RequesterIdentity identity = new RequesterIdentity() {
       @Override
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 5ff80bf02b..9c0010b474 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -256,6 +256,8 @@ public class BrokerMeter implements AbstractMetrics.Meter {
   public static final BrokerMeter GRPC_TRANSPORT_TERMINATED = create(
       "GRPC_TRANSPORT_TERMINATED", "grpcTransport", true);
 
+  public static final BrokerMeter RLS_FILTERS_APPLIED = 
create("RLS_FILTERS_APPLIED", "queries", false);
+
   private static final Map<QueryErrorCode, BrokerMeter> 
QUERY_ERROR_CODE_METER_MAP;
 
   // Iterate through all query error codes from QueryErrorCode.getAllValues() 
and create a metric for each
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 6678573468..dfecbdf808 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -406,4 +406,16 @@ public interface BrokerResponse {
    * @return
    */
   Set<Integer> getPools();
+
+  /**
+   * Set whether RLS (row level security) filters were applied to the query.
+   * @param rlsFiltersApplied true if RLS filters were applied, false otherwise
+   */
+  void setRLSFiltersApplied(boolean rlsFiltersApplied);
+
+  /**
+   * Get whether RLS (row level security) filters were applied to the query.
+   * @return true if RLS filters were applied, false otherwise
+   */
+  boolean getRLSFiltersApplied();
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 3d9cf7c79a..9af536f5ab 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -56,7 +56,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
     "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
     "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", 
"offlineResponseSerMemAllocatedBytes",
     "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", 
"realtimeTotalMemAllocatedBytes",
-    "pools"
+    "pools", "rlsFiltersApplied"
 })
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class BrokerResponseNative implements BrokerResponse {
@@ -115,6 +115,7 @@ public class BrokerResponseNative implements BrokerResponse 
{
   private Set<String> _tablesQueried = Set.of();
 
   private Set<Integer> _pools = Set.of();
+  private boolean _rlsFiltersApplied = false;
 
   public BrokerResponseNative() {
   }
@@ -579,4 +580,14 @@ public class BrokerResponseNative implements 
BrokerResponse {
   public Set<Integer> getPools() {
     return _pools;
   }
+
+  @Override
+  public void setRLSFiltersApplied(boolean rlsFiltersApplied) {
+    _rlsFiltersApplied = rlsFiltersApplied;
+  }
+
+  @Override
+  public boolean getRLSFiltersApplied() {
+    return _rlsFiltersApplied;
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index d9b8bcddc5..557ed032a9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -51,7 +51,7 @@ import org.apache.pinot.common.response.ProcessingException;
     "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
     "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", 
"offlineResponseSerMemAllocatedBytes",
     "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", 
"realtimeTotalMemAllocatedBytes",
-    "pools"
+    "pools", "rlsFiltersApplied"
 })
 public class BrokerResponseNativeV2 implements BrokerResponse {
   private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
@@ -83,6 +83,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse 
{
   private Set<String> _tablesQueried = Set.of();
 
   private Set<Integer> _pools = Set.of();
+  private boolean _rlsFiltersApplied = false;
 
   @JsonInclude(JsonInclude.Include.NON_NULL)
   @Nullable
@@ -405,6 +406,16 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     return _pools;
   }
 
+  @Override
+  public void setRLSFiltersApplied(boolean rlsFiltersApplied) {
+    _rlsFiltersApplied = rlsFiltersApplied;
+  }
+
+  @Override
+  public boolean getRLSFiltersApplied() {
+    return _rlsFiltersApplied;
+  }
+
   public void addBrokerStats(StatMap<StatKey> brokerStats) {
     _brokerStats.merge(brokerStats);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to