This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3ecbb61dfb Introduce rlsFiltersApplied in the broker response (#16208) 3ecbb61dfb is described below commit 3ecbb61dfb201aa51e099a0668550d66d2898de6 Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Tue Jul 1 18:19:14 2025 +0530 Introduce rlsFiltersApplied in the broker response (#16208) --- .../org/apache/pinot/broker/querylog/QueryLogger.java | 6 ++++++ .../BaseSingleStageBrokerRequestHandler.java | 6 ++++++ .../requesthandler/MultiStageBrokerRequestHandler.java | 18 +++++++++++++----- .../apache/pinot/broker/querylog/QueryLoggerTest.java | 2 ++ .../org/apache/pinot/common/metrics/BrokerMeter.java | 2 ++ .../apache/pinot/common/response/BrokerResponse.java | 12 ++++++++++++ .../common/response/broker/BrokerResponseNative.java | 13 ++++++++++++- .../common/response/broker/BrokerResponseNativeV2.java | 13 ++++++++++++- 8 files changed, 65 insertions(+), 7 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java index 6cb4d55555..5edcc24570 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java @@ -334,6 +334,12 @@ public class QueryLogger { void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) { builder.append(params._response.getPools()); } + }, + RLS_FILTERS_APPLIED("rlsFiltersApplied") { + @Override + void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) { + builder.append(params._response.getRLSFiltersApplied()); + } }; public final String _entryName; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 73c67caef1..e3184f83d0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -39,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -404,6 +405,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ TableRouteProvider routeProvider; + AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false); if (logicalTableConfig != null) { Set<String> physicalTableNames = logicalTableConfig.getPhysicalTableConfigMap().keySet(); AuthorizationResult authorizationResult = @@ -453,6 +455,8 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ pinotQuery.setQueryOptions(queryOptions); try { CalciteSqlParser.queryRewrite(pinotQuery, RlsFiltersRewriter.class); + rlsFiltersApplied.set(true); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RLS_FILTERS_APPLIED, 1); } catch (Exception e) { LOGGER.error( "Unable to apply RLS filter: {}. Row-level security filtering will be disabled for this query.", @@ -841,6 +845,8 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ BrokerMetrics.getTagForPreferredPool(sqlNodeAndOptions.getOptions()), String.valueOf(pool)); } + brokerResponse.setRLSFiltersApplied(rlsFiltersApplied.get()); + // Log query and stats _queryLogger.log( new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index b288a8e95e..bd42a81d77 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -38,6 +38,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.ws.rs.WebApplicationException; @@ -299,13 +300,14 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { try (QueryEnvironment.CompiledQuery compiledQuery = compileQuery(requestId, query, sqlNodeAndOptions, httpHeaders, queryTimer)) { - - checkAuthorization(requesterIdentity, requestContext, httpHeaders, compiledQuery); + AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false); + checkAuthorization(requesterIdentity, requestContext, httpHeaders, compiledQuery, rlsFiltersApplied); if (sqlNodeAndOptions.getSqlNode().getKind() == SqlKind.EXPLAIN) { return explain(compiledQuery, requestId, requestContext, queryTimer); } else { - return query(compiledQuery, requestId, requesterIdentity, requestContext, httpHeaders, queryTimer); + return query(compiledQuery, requestId, requesterIdentity, requestContext, httpHeaders, queryTimer, + rlsFiltersApplied.get()); } } } @@ -336,7 +338,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } private void checkAuthorization(RequesterIdentity requesterIdentity, RequestContext requestContext, - HttpHeaders httpHeaders, QueryEnvironment.CompiledQuery compiledQuery) { + HttpHeaders httpHeaders, QueryEnvironment.CompiledQuery compiledQuery, AtomicBoolean rlsFiltersApplied) { Set<String> tables = compiledQuery.getTableNames(); if (tables != null && !tables.isEmpty()) { TableAuthorizationResult tableAuthorizationResult = @@ -349,10 +351,12 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { for (String tableName : tables) { accessControl.getRowColFilters(requesterIdentity, tableName).getRLSFilters() .ifPresent(rowFilters -> { + rlsFiltersApplied.set(true); String combinedFilters = rowFilters.stream().map(filter -> "( " + filter + " )").collect(Collectors.joining(" AND ")); String key = RlsUtils.buildRlsFilterKey(tableName); compiledQuery.getOptions().put(key, combinedFilters); + _brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.RLS_FILTERS_APPLIED, 1); }); } } @@ -444,7 +448,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestId, - RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders, Timer timer) + RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders, Timer timer, + boolean rlsFiltersApplied) throws QueryException, WebApplicationException { QueryEnvironment.QueryPlannerResult queryPlanResult = callAsync(requestId, query.getTextQuery(), () -> query.planQuery(requestId), timer); @@ -585,6 +590,9 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { brokerResponse.setResultTable(null); } + // set if rls (row level security) filters have been applied on the query + brokerResponse.setRLSFiltersApplied(rlsFiltersApplied); + // Log query and stats _queryLogger.log( new QueryLogger.QueryLogParams(requestContext, tableNames.toString(), brokerResponse, diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java index 124aade345..141ff3949e 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java @@ -116,6 +116,7 @@ public class QueryLoggerTest { + "offlineMemAllocatedBytes(total/thread/resSer):0/0/0," + "realtimeMemAllocatedBytes(total/thread/resSer):0/0/0," + "pools=[]," + + "rlsFiltersApplied=true," + "query=SELECT * FROM foo"); //@formatter:on } @@ -278,6 +279,7 @@ public class QueryLoggerTest { response.setRealtimeSystemActivitiesCpuTimeNs(18); response.setRealtimeResponseSerializationCpuTimeNs(19); response.setBrokerReduceTimeMs(20); + response.setRLSFiltersApplied(true); RequesterIdentity identity = new RequesterIdentity() { @Override diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 5ff80bf02b..9c0010b474 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -256,6 +256,8 @@ public class BrokerMeter implements AbstractMetrics.Meter { public static final BrokerMeter GRPC_TRANSPORT_TERMINATED = create( "GRPC_TRANSPORT_TERMINATED", "grpcTransport", true); + public static final BrokerMeter RLS_FILTERS_APPLIED = create("RLS_FILTERS_APPLIED", "queries", false); + private static final Map<QueryErrorCode, BrokerMeter> QUERY_ERROR_CODE_METER_MAP; // Iterate through all query error codes from QueryErrorCode.getAllValues() and create a metric for each diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index 6678573468..dfecbdf808 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -406,4 +406,16 @@ public interface BrokerResponse { * @return */ Set<Integer> getPools(); + + /** + * Set whether RLS (row level security) filters were applied to the query. + * @param rlsFiltersApplied true if RLS filters were applied, false otherwise + */ + void setRLSFiltersApplied(boolean rlsFiltersApplied); + + /** + * Get whether RLS (row level security) filters were applied to the query. + * @return true if RLS filters were applied, false otherwise + */ + boolean getRLSFiltersApplied(); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index 3d9cf7c79a..9af536f5ab 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -56,7 +56,7 @@ import org.apache.pinot.spi.utils.JsonUtils; "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried", "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes", "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes", - "pools" + "pools", "rlsFiltersApplied" }) @JsonIgnoreProperties(ignoreUnknown = true) public class BrokerResponseNative implements BrokerResponse { @@ -115,6 +115,7 @@ public class BrokerResponseNative implements BrokerResponse { private Set<String> _tablesQueried = Set.of(); private Set<Integer> _pools = Set.of(); + private boolean _rlsFiltersApplied = false; public BrokerResponseNative() { } @@ -579,4 +580,14 @@ public class BrokerResponseNative implements BrokerResponse { public Set<Integer> getPools() { return _pools; } + + @Override + public void setRLSFiltersApplied(boolean rlsFiltersApplied) { + _rlsFiltersApplied = rlsFiltersApplied; + } + + @Override + public boolean getRLSFiltersApplied() { + return _rlsFiltersApplied; + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index d9b8bcddc5..557ed032a9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -51,7 +51,7 @@ import org.apache.pinot.common.response.ProcessingException; "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried", "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes", "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes", - "pools" + "pools", "rlsFiltersApplied" }) public class BrokerResponseNativeV2 implements BrokerResponse { private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class); @@ -83,6 +83,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse { private Set<String> _tablesQueried = Set.of(); private Set<Integer> _pools = Set.of(); + private boolean _rlsFiltersApplied = false; @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @@ -405,6 +406,16 @@ public class BrokerResponseNativeV2 implements BrokerResponse { return _pools; } + @Override + public void setRLSFiltersApplied(boolean rlsFiltersApplied) { + _rlsFiltersApplied = rlsFiltersApplied; + } + + @Override + public boolean getRLSFiltersApplied() { + return _rlsFiltersApplied; + } + public void addBrokerStats(StatMap<StatKey> brokerStats) { _brokerStats.merge(brokerStats); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org