This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 767e3cddd6 Set groupsTrimmed result flag when data is trimmed. (#16220)
767e3cddd6 is described below

commit 767e3cddd6472bb12307af3e78f82d471be1559d
Author: Bolek Ziobrowski <26925920+bziobrow...@users.noreply.github.com>
AuthorDate: Wed Jul 2 14:19:16 2025 +0200

    Set groupsTrimmed result flag when data is trimmed. (#16220)
---
 .../apache/pinot/broker/querylog/QueryLogger.java  |   6 +
 .../requesthandler/BaseBrokerRequestHandler.java   |   1 +
 .../BaseSingleStageBrokerRequestHandler.java       |   5 +-
 .../MultiStageBrokerRequestHandler.java            |   6 +
 .../pinot/broker/querylog/QueryLoggerTest.java     |   1 +
 .../org/apache/pinot/client/ExecutionStats.java    |   6 +
 .../apache/pinot/client/ExecutionStatsTest.java    |  11 +-
 .../apache/pinot/common/datatable/DataTable.java   |   5 +-
 .../apache/pinot/common/metrics/BrokerMeter.java   |   4 +
 .../apache/pinot/common/metrics/ServerMeter.java   |   6 +
 .../pinot/common/response/BrokerResponse.java      |   5 +
 .../response/broker/BrokerResponseNative.java      |  12 +-
 .../response/broker/BrokerResponseNativeV2.java    |  12 +-
 .../response/broker/CursorResponseNative.java      |   2 +
 .../apache/pinot/core/data/table/IndexedTable.java |   7 +
 .../blocks/results/GroupByResultsBlock.java        |  12 +
 .../operator/combine/GroupByCombineOperator.java   |   9 +
 .../operator/query/FilteredGroupByOperator.java    |   4 +
 .../pinot/core/operator/query/GroupByOperator.java |   4 +
 .../query/reduce/ExecutionStatsAggregator.java     |   3 +
 .../core/query/reduce/GroupByDataTableReducer.java |   4 +
 .../core/query/request/context/QueryContext.java   |  47 ++
 ...erSegmentAggregationSingleValueQueriesTest.java |  30 +
 .../GroupByEnableTrimOptionIntegrationTest.java    |   2 +-
 .../tests/GroupByOptionsIntegrationTest.java       |  39 +-
 .../tests/GroupByTrimmingIntegrationTest.java      | 789 +++++++++++++++++++++
 .../query/runtime/operator/AggregateOperator.java  |   5 +
 .../pinot/query/runtime/operator/LeafOperator.java |   4 +
 .../query/runtime/operator/MultiStageOperator.java |   1 +
 .../operator/MultistageGroupByExecutor.java        |  17 +-
 .../pinot/spi/trace/DefaultRequestContext.java     |  11 +
 .../org/apache/pinot/spi/trace/RequestContext.java |   4 +
 32 files changed, 1060 insertions(+), 14 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java 
b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
index 5edcc24570..61359e2b3f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
@@ -245,6 +245,12 @@ public class QueryLogger {
             .append(params._response.getNumServersQueried());
       }
     },
+    GROUPS_TRIMMED("groupsTrimmed") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams 
params) {
+        builder.append(params._response.isGroupsTrimmed());
+      }
+    },
     GROUP_LIMIT_REACHED("groupLimitReached") {
       @Override
       void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams 
params) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index e990fc3945..895102dc61 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -302,6 +302,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     }
     statistics.setProcessingExceptions(processingExceptions);
     statistics.setNumExceptions(numExceptions);
+    statistics.setGroupsTrimmed(response.isGroupsTrimmed());
     statistics.setNumGroupsLimitReached(response.isNumGroupsLimitReached());
     statistics.setProcessingTimeMillis(response.getTimeUsedMs());
     statistics.setNumDocsScanned(response.getNumDocsScanned());
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index e3184f83d0..b30a3f7a9a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -811,6 +811,9 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED,
           1);
     }
+    if (brokerResponse.isGroupsTrimmed()) {
+      _brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1);
+    }
 
     // server returns STRING as default dataType for all columns in (some) 
scenarios where no rows are returned
     // this is an attempt to return more faithful information based on other 
sources
@@ -1904,7 +1907,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
    * TODO: come up with other criteria for forcing a log and come up with 
better numbers
    */
   private boolean forceLog(BrokerResponse brokerResponse, long totalTimeMs) {
-    if (brokerResponse.isNumGroupsLimitReached()) {
+    if (brokerResponse.isNumGroupsLimitReached() || 
brokerResponse.isGroupsTrimmed()) {
       return true;
     }
 
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 78513ef01c..81bfe277d8 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -595,6 +595,12 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         }
       }
 
+      if (brokerResponse.isGroupsTrimmed()) {
+        for (String table : tableNames) {
+          _brokerMetrics.addMeteredTableValue(table, 
BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1);
+        }
+      }
+
       // Set total query processing time
       // TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS
       long totalTimeMs = System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis();
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
index 141ff3949e..456b99ec8a 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
@@ -104,6 +104,7 @@ public class QueryLoggerTest {
         + ":5/6/7/8/9/10/21,"
         + "consumingFreshnessTimeMs=11,"
         + "servers=12/13,"
+        + "groupsTrimmed=false,"
         + "groupLimitReached=false,"
         + "groupWarningLimitReached=false,"
         + "brokerReduceTimeMs=20,"
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
index 61b0029714..b2d2ef7919 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
@@ -43,6 +43,7 @@ public class ExecutionStats {
   private static final String NUM_CONSUMING_SEGMENTS_QUERIED = 
"numConsumingSegmentsQueried";
   private static final String MIN_CONSUMING_FRESHNESS_TIME_MS = 
"minConsumingFreshnessTimeMs";
   private static final String TOTAL_DOCS = "totalDocs";
+  private static final String GROUPS_TRIMMED = "groupsTrimmed";
   private static final String NUM_GROUPS_LIMIT_REACHED = 
"numGroupsLimitReached";
   private static final String NUM_GROUPS_WARNING_LIMIT_REACHED = 
"numGroupsWarningLimitReached";
   private static final String BROKER_REDUCE_TIME_MS = "brokerReduceTimeMs";
@@ -112,6 +113,10 @@ public class ExecutionStats {
     return _brokerResponse.has(NUM_GROUPS_LIMIT_REACHED) && 
_brokerResponse.get(NUM_GROUPS_LIMIT_REACHED).asBoolean();
   }
 
+  public boolean isGroupsTrimmed() {
+    return _brokerResponse.has(GROUPS_TRIMMED) && 
_brokerResponse.get(GROUPS_TRIMMED).asBoolean();
+  }
+
   public boolean isNumGroupsWarningLimitReached() {
     return _brokerResponse.has(NUM_GROUPS_WARNING_LIMIT_REACHED)
         && _brokerResponse.get(NUM_GROUPS_WARNING_LIMIT_REACHED).asBoolean();
@@ -143,6 +148,7 @@ public class ExecutionStats {
     map.put(NUM_CONSUMING_SEGMENTS_QUERIED, getNumConsumingSegmentsQueried());
     map.put(MIN_CONSUMING_FRESHNESS_TIME_MS, getMinConsumingFreshnessTimeMs() 
+ "ms");
     map.put(TOTAL_DOCS, getTotalDocs());
+    map.put(GROUPS_TRIMMED, isGroupsTrimmed());
     map.put(NUM_GROUPS_LIMIT_REACHED, isNumGroupsLimitReached());
     map.put(NUM_GROUPS_WARNING_LIMIT_REACHED, 
isNumGroupsWarningLimitReached());
     map.put(BROKER_REDUCE_TIME_MS, getBrokerReduceTimeMs() + "ms");
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
index 583a28e5ff..3757566c83 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
@@ -43,7 +43,7 @@ public class ExecutionStatsTest {
             + "\"numEntriesScannedInFilter\":10, 
\"numEntriesScannedPostFilter\":10, \"numSegmentsQueried\":10, "
             + "\"numSegmentsProcessed\":10, \"numSegmentsMatched\":10, 
\"numConsumingSegmentsQueried\":10, "
             + "\"minConsumingFreshnessTimeMs\":10, \"totalDocs\":10, 
\"numGroupsLimitReached\":true, "
-            + "\"timeUsedMs\":10}";
+            + "\"timeUsedMs\":10, \"groupsTrimmed\": true}";
     _mockBrokerResponse = JsonUtils.stringToJsonNode(json);
     _executionStatsUnderTest = new ExecutionStats(_mockBrokerResponse);
   }
@@ -156,6 +156,15 @@ public class ExecutionStatsTest {
     assertTrue(result);
   }
 
+  @Test
+  public void testIsGroupsTrimmed() {
+    // Run the test
+    final boolean result = _executionStatsUnderTest.isGroupsTrimmed();
+
+    // Verify the results
+    assertTrue(result);
+  }
+
   @Test
   public void testGetTimeUsedMs() {
     // Run the test
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index bd39b76fd9..9f4040084f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -146,11 +146,12 @@ public interface DataTable {
     THREAD_MEM_ALLOCATED_BYTES(36, "threadMemAllocatedBytes", 
MetadataValueType.LONG),
     RESPONSE_SER_MEM_ALLOCATED_BYTES(37, "responseSerMemAllocatedBytes", 
MetadataValueType.LONG),
     // NOTE: for server after release 1.3.0 this flag is always set to true 
since servers now perform sorting
-    SORTED(38, "sorted", MetadataValueType.BOOLEAN);
+    SORTED(38, "sorted", MetadataValueType.BOOLEAN),
+    GROUPS_TRIMMED(39, "groupsTrimmed", MetadataValueType.STRING);
 
     // We keep this constant to track the max id added so far for backward 
compatibility.
     // Increase it when adding new keys, but NEVER DECREASE IT!!!
-    private static final int MAX_ID = 38;
+    private static final int MAX_ID = GROUPS_TRIMMED.getId();
 
     private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new 
MetadataKey[MAX_ID + 1];
     private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new 
HashMap<>();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 9c0010b474..a87784a554 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -152,6 +152,10 @@ public class BrokerMeter implements AbstractMetrics.Meter {
   public static final BrokerMeter 
SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS = create(
       "SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS", "badResponses", 
false);
 
+  // This metric track the number of broker responses with trimmed groups 
(potential bad responses).
+  public static final BrokerMeter BROKER_RESPONSES_WITH_GROUPS_TRIMMED = 
create(
+      "BROKER_RESPONSES_WITH_GROUPS_TRIMMED", "badResponses", false);
+
   // This metric track the number of broker responses with number of groups 
limit reached (potential bad responses).
   public static final BrokerMeter 
BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED = create(
       "BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED", "badResponses", false);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 1c56532b55..8a73029ac2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -150,6 +150,12 @@ public enum ServerMeter implements AbstractMetrics.Meter {
    * But if a single query has 2 different join operators and each one reaches 
the limit, this will be increased by 2.
    */
   HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true),
+  /**
+   * Number of times group by results were trimmed.
+   * It is increased in one by each worker that reaches the limit within the 
stage.
+   * That means that if a stage has 10 workers and all of them reach the 
limit, this will be increased by 10.
+   */
+  AGGREGATE_TIMES_GROUPS_TRIMMED("times", true),
   /**
    * Number of times the max number of groups has been reached.
    * It is increased in one by each worker that reaches the limit within the 
stage.
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index dfecbdf808..0672b7d847 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -126,6 +126,11 @@ public interface BrokerResponse {
     return getExceptions().size();
   }
 
+  /**
+   * Returns whether groups were trimmed (reduced in size after sorting).
+   */
+  boolean isGroupsTrimmed();
+
   /**
    * Returns whether the number of groups limit has been reached.
    */
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 60a035ead6..23cbbdcf40 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -56,7 +56,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
     "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
     "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", 
"offlineResponseSerMemAllocatedBytes",
     "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", 
"realtimeTotalMemAllocatedBytes",
-    "pools", "rlsFiltersApplied"
+    "pools", "rlsFiltersApplied", "groupsTrimmed"
 })
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class BrokerResponseNative implements BrokerResponse {
@@ -72,6 +72,7 @@ public class BrokerResponseNative implements BrokerResponse {
   private ResultTable _resultTable;
   private int _numRowsResultSet = 0;
   private List<QueryProcessingException> _exceptions = new ArrayList<>();
+  private boolean _groupsTrimmed = false;
   private boolean _numGroupsLimitReached = false;
   private boolean _numGroupsWarningLimitReached = false;
   private long _timeUsedMs = 0L;
@@ -205,6 +206,15 @@ public class BrokerResponseNative implements 
BrokerResponse {
     _exceptions.add(exception);
   }
 
+  @Override
+  public boolean isGroupsTrimmed() {
+    return _groupsTrimmed;
+  }
+
+  public void setGroupsTrimmed(boolean groupsTrimmed) {
+    _groupsTrimmed = groupsTrimmed;
+  }
+
   @Override
   public boolean isNumGroupsLimitReached() {
     return _numGroupsLimitReached;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 0926a811c0..d8e10f3745 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -51,7 +51,7 @@ import org.apache.pinot.common.response.ProcessingException;
     "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
     "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", 
"offlineResponseSerMemAllocatedBytes",
     "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", 
"realtimeTotalMemAllocatedBytes",
-    "pools", "rlsFiltersApplied"
+    "pools", "rlsFiltersApplied", "groupsTrimmed"
 })
 public class BrokerResponseNativeV2 implements BrokerResponse {
   private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
@@ -126,11 +126,20 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     addException(new QueryProcessingException(exception.getErrorCode(), 
exception.getMessage()));
   }
 
+  @Override
+  public boolean isGroupsTrimmed() {
+    return _brokerStats.getBoolean(StatKey.GROUPS_TRIMMED);
+  }
+
   @Override
   public boolean isNumGroupsLimitReached() {
     return _brokerStats.getBoolean(StatKey.NUM_GROUPS_LIMIT_REACHED);
   }
 
+  public void mergeGroupsTrimmed(boolean groupsTrimmed) {
+    _brokerStats.merge(StatKey.GROUPS_TRIMMED, groupsTrimmed);
+  }
+
   public void mergeNumGroupsLimitReached(boolean numGroupsLimitReached) {
     _brokerStats.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, 
numGroupsLimitReached);
   }
@@ -444,6 +453,7 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     NUM_SEGMENTS_PRUNED_INVALID(StatMap.Type.INT),
     NUM_SEGMENTS_PRUNED_BY_LIMIT(StatMap.Type.INT),
     NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
+    GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
index 5a5d2516e8..e1ba761767 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.response.CursorResponse;
     "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
     "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
     "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tableQueries",
+    "groupsTrimmed",
     // Fields specific to CursorResponse
     "offset", "numRows", "cursorResultWriteTimeMs", "cursorFetchTimeMs", 
"submissionTimeMs", "expirationTimeMs",
     "brokerHost", "brokerPort", "bytesWritten"
@@ -57,6 +58,7 @@ public class CursorResponseNative extends 
BrokerResponseNative implements Cursor
     setResultTable(response.getResultTable());
     setNumRowsResultSet(response.getNumRowsResultSet());
     setExceptions(response.getExceptions());
+    setGroupsTrimmed(response.isGroupsTrimmed());
     setNumGroupsLimitReached(response.isNumGroupsLimitReached());
     setNumGroupsWarningLimitReached(response.isNumGroupsWarningLimitReached());
     setTimeUsedMs(response.getTimeUsedMs());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index d96854ccb5..9ce27567d0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -270,6 +270,13 @@ public abstract class IndexedTable extends BaseTable {
     return _numResizes;
   }
 
+  public boolean isTrimmed() {
+    // single resize occurs on finish() if there's orderBy
+    // all other re-sizes are triggered by trim size and threshold
+    int min = _topRecords != null && _hasOrderBy ? 1 : 0;
+    return _numResizes > min;
+  }
+
   public long getResizeTimeMs() {
     return TimeUnit.NANOSECONDS.toMillis(_resizeTimeNs);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 425bb2052c..48ff191c2d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -61,6 +61,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
   private final Table _table;
   private final QueryContext _queryContext;
 
+  private boolean _groupsTrimmed;
   private boolean _numGroupsLimitReached;
   private boolean _numGroupsWarningLimitReached;
   private int _numResizes;
@@ -142,6 +143,14 @@ public class GroupByResultsBlock extends BaseResultsBlock {
     _numGroupsLimitReached = numGroupsLimitReached;
   }
 
+  public boolean isGroupsTrimmed() {
+    return _groupsTrimmed;
+  }
+
+  public void setGroupsTrimmed(boolean groupsTrimmed) {
+    _groupsTrimmed = groupsTrimmed;
+  }
+
   public boolean isNumGroupsWarningLimitReached() {
     return _numGroupsWarningLimitReached;
   }
@@ -336,6 +345,9 @@ public class GroupByResultsBlock extends BaseResultsBlock {
   @Override
   public Map<String, String> getResultsMetadata() {
     Map<String, String> metadata = super.getResultsMetadata();
+    if (_groupsTrimmed) {
+      metadata.put(MetadataKey.GROUPS_TRIMMED.getName(), "true");
+    }
     if (_numGroupsLimitReached) {
       metadata.put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true");
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index fb34278b12..18e67b9899 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -66,6 +66,7 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
   private final CountDownLatch _operatorLatch;
 
   private volatile IndexedTable _indexedTable;
+  private volatile boolean _groupsTrimmed;
   private volatile boolean _numGroupsLimitReached;
   private volatile boolean _numGroupsWarningLimitReached;
 
@@ -120,6 +121,9 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
           }
         }
 
+        if (resultsBlock.isGroupsTrimmed()) {
+          _groupsTrimmed = true;
+        }
         // Set groups limit reached flag.
         if (resultsBlock.isNumGroupsLimitReached()) {
           _numGroupsLimitReached = true;
@@ -222,6 +226,10 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
       return new ExceptionResultsBlock(errMsg);
     }
 
+    if (_indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) {
+      _groupsTrimmed = true;
+    }
+
     IndexedTable indexedTable = _indexedTable;
     if (_queryContext.isServerReturnFinalResult()) {
       indexedTable.finish(true, true);
@@ -231,6 +239,7 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
       indexedTable.finish(false);
     }
     GroupByResultsBlock mergedBlock = new GroupByResultsBlock(indexedTable, 
_queryContext);
+    mergedBlock.setGroupsTrimmed(_groupsTrimmed);
     mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached);
     mergedBlock.setNumGroupsWarningLimitReached(_numGroupsWarningLimitReached);
     mergedBlock.setNumResizes(indexedTable.getNumResizes());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
index c17e64d8de..1afda13ca6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java
@@ -195,7 +195,11 @@ public class FilteredGroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
         TableResizer tableResizer = new TableResizer(_dataSchema, 
_queryContext);
         Collection<IntermediateRecord> intermediateRecords =
             tableResizer.trimInSegmentResults(groupKeyGenerator, 
groupByResultHolders, trimSize);
+
+        
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED,
 1);
+        boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag 
only if it's not safe
         GroupByResultsBlock resultsBlock = new 
GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext);
+        resultsBlock.setGroupsTrimmed(unsafeTrim);
         resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached);
         
resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached);
         return resultsBlock;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
index 33072d5fd4..d28de96107 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java
@@ -143,7 +143,11 @@ public class GroupByOperator extends 
BaseOperator<GroupByResultsBlock> {
       if (groupByExecutor.getNumGroups() > trimSize) {
         TableResizer tableResizer = new TableResizer(_dataSchema, 
_queryContext);
         Collection<IntermediateRecord> intermediateRecords = 
groupByExecutor.trimGroupByResult(trimSize, tableResizer);
+
+        
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED,
 1);
+        boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag 
only if it's not safe
         GroupByResultsBlock resultsBlock = new 
GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext);
+        resultsBlock.setGroupsTrimmed(unsafeTrim);
         resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached);
         
resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached);
         return resultsBlock;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 02f08661e9..bb49e0543d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -70,6 +70,7 @@ public class ExecutionStatsAggregator {
   private long _numSegmentsPrunedByValue = 0L;
   private long _explainPlanNumEmptyFilterSegments = 0L;
   private long _explainPlanNumMatchAllFilterSegments = 0L;
+  private boolean _groupsTrimmed = false;
   private boolean _numGroupsLimitReached = false;
   private boolean _numGroupsWarningLimitReached = false;
 
@@ -228,6 +229,7 @@ public class ExecutionStatsAggregator {
       _numTotalDocs += Long.parseLong(numTotalDocsString);
     }
 
+    _groupsTrimmed |= 
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.GROUPS_TRIMMED.getName()));
     _numGroupsLimitReached |=
         
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
     _numGroupsWarningLimitReached |=
@@ -252,6 +254,7 @@ public class ExecutionStatsAggregator {
     brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed);
     brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched);
     brokerResponseNative.setTotalDocs(_numTotalDocs);
+    brokerResponseNative.setGroupsTrimmed(_groupsTrimmed);
     brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
     
brokerResponseNative.setNumGroupsWarningLimitReached(_numGroupsWarningLimitReached);
     brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 2072e937de..c5647c1b27 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -145,6 +145,10 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
       throws TimeoutException {
     // NOTE: This step will modify the data schema and also return final 
aggregate results.
     IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, 
reducerContext);
+    if (indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) {
+      brokerResponseNative.setGroupsTrimmed(true);
+    }
+
     if (brokerMetrics != null) {
       brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
       brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 484868f7ce..6c8076091e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.request.context;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -164,6 +165,48 @@ public class QueryContext {
     _explain = explain;
   }
 
+  private boolean isSameOrderAndGroupByColumns(QueryContext context) {
+    List<ExpressionContext> groupByKeys = context.getGroupByExpressions();
+    List<OrderByExpressionContext> orderByKeys = 
context.getOrderByExpressions();
+
+    if (groupByKeys == null || groupByKeys.isEmpty()) {
+      return orderByKeys == null || orderByKeys.isEmpty();
+    } else if (orderByKeys == null || orderByKeys.isEmpty()) {
+      return false;
+    }
+
+    BitSet orderByKeysMatched = new BitSet(orderByKeys.size());
+
+    OUTER_GROUP:
+    for (ExpressionContext groupExp : groupByKeys) {
+      for (int i = 0; i < orderByKeys.size(); i++) {
+        OrderByExpressionContext orderExp = orderByKeys.get(i);
+        if (groupExp.equals(orderExp.getExpression())) {
+          orderByKeysMatched.set(i);
+          continue OUTER_GROUP;
+        }
+      }
+
+      return false;
+    }
+
+    OUTER_ORDER:
+    for (int i = 0, n = orderByKeys.size(); i < n; i++) {
+      if (orderByKeysMatched.get(i)) {
+        continue;
+      }
+
+      for (ExpressionContext groupExp : groupByKeys) {
+        if (groupExp.equals(orderByKeys.get(i).getExpression())) {
+          continue OUTER_ORDER;
+        }
+      }
+      return false;
+    }
+
+    return true;
+  }
+
   /**
    * Returns the table name.
    * NOTE: on the broker side, table name might be {@code null} when subquery 
is available.
@@ -520,6 +563,10 @@ public class QueryContext {
     return isIndexUseAllowed(dataSource.getColumnName(), indexType);
   }
 
+  public boolean isUnsafeTrim() {
+    return !isSameOrderAndGroupByColumns(this) || getHavingFilter() != null;
+  }
+
   public static class Builder {
     private String _tableName;
     private QueryContext _subquery;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index 6fcb909374..b5fd08dda4 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -774,6 +774,36 @@ public class InterSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     assertTrue(brokerResponse.isNumGroupsLimitReached());
   }
 
+  @Test
+  public void testGroupsTrimmedAtSegmentLevel() {
+    String query = "SELECT COUNT(*) FROM testTable GROUP BY column1, column3 
ORDER BY column1";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    assertFalse(brokerResponse.isGroupsTrimmed());
+
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMinSegmentGroupTrimSize(5);
+    brokerResponse = getBrokerResponse(query, planMaker);
+
+    assertTrue(brokerResponse.isGroupsTrimmed());
+  }
+
+  @Test
+  public void testGroupsTrimmedAtServerLevel() {
+    String query = "SELECT COUNT(*) FROM testTable GROUP BY column1, column3 
ORDER BY column1";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    assertFalse(brokerResponse.isGroupsTrimmed());
+
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMinServerGroupTrimSize(5);
+    // on server level, trimming occurs only when threshold is reached
+    planMaker.setGroupByTrimThreshold(100);
+    brokerResponse = getBrokerResponse(query, planMaker);
+
+    assertTrue(brokerResponse.isGroupsTrimmed());
+  }
+
   @Test
   public void testDistinctSum() {
     String query = "select DISTINCTSUM(column1) as v1, DISTINCTSUM(column3) as 
v2 from testTable";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
index 947c2032e0..4f534e458c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
@@ -207,7 +207,7 @@ public class GroupByEnableTrimOptionIntegrationTest extends 
BaseClusterIntegrati
     JsonNode plan = postV2Query(option + " set explainAskingServers=true; 
explain plan for " + query);
 
     Assert.assertEquals(GroupByOptionsIntegrationTest.toResultStr(result), 
expectedResult);
-    Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan), 
expectedPlan);
+    Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan, 
true), expectedPlan);
   }
 
   private JsonNode postV2Query(String query)
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
index 272f314482..e2251cece3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -473,7 +474,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
     JsonNode plan = postV2Query(option + " set explainAskingServers=true; 
explain plan for " + query);
 
     Assert.assertEquals(toResultStr(result), expectedResult);
-    Assert.assertEquals(toExplainStr(plan), expectedPlan);
+    Assert.assertEquals(toExplainStr(plan, true), expectedPlan);
   }
 
   @Test
@@ -550,6 +551,17 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
         getExtraQueryProperties());
   }
 
+  public static @NotNull String toResultStr(ResultSetGroup resultSet) {
+    if (resultSet == null) {
+      return "null";
+    }
+    JsonNode node = resultSet.getBrokerResponse().getResultTable();
+    if (node == null) {
+      return toErrorString(resultSet.getBrokerResponse().getExceptions());
+    }
+    return toString(node);
+  }
+
   public static @NotNull String toResultStr(JsonNode mainNode) {
     if (mainNode == null) {
       return "null";
@@ -561,7 +573,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
     return toString(node);
   }
 
-  static @NotNull String toExplainStr(JsonNode mainNode) {
+  static @NotNull String toExplainStr(JsonNode mainNode, boolean isMSQE) {
     if (mainNode == null) {
       return "null";
     }
@@ -569,7 +581,11 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
     if (node == null) {
       return toErrorString(mainNode.get("exceptions"));
     }
-    return toExplainString(node);
+    return toExplainString(node, isMSQE);
+  }
+
+  static @NotNull String toExplainStr(JsonNode mainNode) {
+    return toExplainStr(mainNode, false);
   }
 
   public static String toErrorString(JsonNode node) {
@@ -613,8 +629,21 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
     return buf.toString();
   }
 
-  public static String toExplainString(JsonNode node) {
-    return node.get("rows").get(0).get(1).textValue();
+  private static String toExplainString(JsonNode node, boolean isMSQE) {
+    if (isMSQE) {
+      return node.get("rows").get(0).get(1).textValue();
+    } else {
+      StringBuilder result = new StringBuilder();
+      JsonNode rows = node.get("rows");
+      for (int i = 0, n = rows.size(); i < n; i++) {
+        JsonNode row = rows.get(i);
+        result.append(row.get(0).textValue())
+            .append(",\t").append(row.get(1).intValue())
+            .append(",\t").append(row.get(2).intValue())
+            .append('\n');
+      }
+      return result.toString();
+    }
   }
 
   @AfterClass
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
new file mode 100644
index 0000000000..51f42729a0
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
@@ -0,0 +1,789 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.Connection;
+import org.apache.pinot.client.ResultSetGroup;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toExplainStr;
+import static 
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toResultStr;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+
+// Tests that 'groupsTrimmed' flag is set when results trimming occurs at:
+// SSQE - segment, inter-segment/server and broker levels
+// MSQE - segment, inter-segment and intermediate levels
+// Note: MSQE doesn't push collations depending on group by result into 
aggregation nodes
+// so e.g. ORDER BY i*j doesn't trigger trimming even when hints are set
+public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSet {
+
+  static final int FILES_NO = 4;
+  static final int RECORDS_NO = 1000;
+  static final String I_COL = "i";
+  static final String J_COL = "j";
+  static final int SERVERS_NO = 2;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    startZk();
+    startController();
+    startServers(SERVERS_NO);
+    startBroker();
+
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+        .addSingleValueDimension(I_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension(J_COL, FieldSpec.DataType.LONG)
+        .build();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    List<File> avroFiles = createAvroFile(_tempDir);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
+
+    // Wait for all documents loaded
+    TestUtils.waitForCondition(() -> 
getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L,
+        60_000,
+        "Failed to load  documents", true, Duration.ofMillis(60_000 / 10));
+
+    setUseMultiStageQueryEngine(true);
+
+    Map<String, List<String>> map = 
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+
+    // make sure segments are split between multiple servers
+    assertEquals(map.size(), SERVERS_NO);
+  }
+
+  protected TableConfig createOfflineTableConfig() {
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(getTableName())
+        .setNumReplicas(getNumReplicas())
+        .setBrokerTenant(getBrokerTenant())
+        .build();
+  }
+
+  public static List<File> createAvroFile(File tempDir)
+      throws IOException {
+
+    // create avro schema
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+        new org.apache.avro.Schema.Field(I_COL,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), 
null, null),
+        new org.apache.avro.Schema.Field(J_COL,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), 
null, null)));
+
+    List<File> files = new ArrayList<>();
+    for (int file = 0; file < FILES_NO; file++) {
+      File avroFile = new File(tempDir, "data_" + file + ".avro");
+      try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+        fileWriter.create(avroSchema, avroFile);
+
+        for (int docId = 0; docId < RECORDS_NO; docId++) {
+          GenericData.Record record = new GenericData.Record(avroSchema);
+          record.put(I_COL, docId % 100);
+          record.put(J_COL, docId);
+          fileWriter.append(record);
+        }
+        files.add(avroFile);
+      }
+    }
+    return files;
+  }
+
+  // MSQE - multi stage query engine
+  @Test
+  public void testMSQEOrderByOnDependingOnAggregateResultIsNotPushedDown()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable 
GROUP BY i, j ORDER BY i*j DESC LIMIT 5"));
+
+    String options = "SET minSegmentGroupTrimSize=5; ";
+    String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, 
j, COUNT(*) "
+        + "FROM mytable GROUP BY i, j ORDER BY i*j DESC LIMIT 5 ";
+
+    ResultSetGroup result = conn.execute(options + query);
+    assertTrimFlagNotSet(result);
+
+    assertEquals("Execution Plan\n"
+            + "LogicalSort(sort0=[$3], dir0=[DESC], offset=[0], fetch=[5])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], collation=[[3 
DESC]], isSortOnSender=[false], "
+            + "isSortOnReceiver=[true])\n"
+            + "    LogicalSort(sort0=[$3], dir0=[DESC], fetch=[5])\n" // <-- 
actual sort & limit
+            + "      LogicalProject(i=[$0], j=[$1], EXPR$2=[$2], EXPR$3=[*($0, 
$1)])\n"
+            // <-- order by value is computed here, so trimming in upstream 
stages is not possible
+            + "        PinotLogicalAggregate(group=[{0, 1}], 
agg#0=[COUNT($2)], aggType=[FINAL])\n"
+            + "          PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+            + "            LeafStageCombineOperator(table=[mytable])\n"
+            + "              StreamingInstanceResponse\n"
+            + "                CombineGroupBy\n"
+            + "                  GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
+            + "                    Project(columns=[[i, j]])\n"
+            + "                      DocIdSet(maxDocs=[40000])\n"
+            + "                        
FilterMatchEntireSegment(numDocs=[4000])\n",
+        toExplainStr(postQuery(options + " SET explainAskingServers=true; 
EXPLAIN PLAN FOR " + query), true));
+  }
+
+  @Test
+  public void 
testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable 
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+
+    String options = "SET minSegmentGroupTrimSize=5; ";
+    String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, 
j, COUNT(*) "
+        + "FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5 ";
+
+    ResultSetGroup result = conn.execute(options + query);
+    assertTrimFlagSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
+        + "99,\t999,\t4\n"
+        + "98,\t998,\t4\n"
+        + "97,\t997,\t4\n"
+        + "96,\t996,\t4\n"
+        + "95,\t995,\t4", toResultStr(result));
+
+    assertEquals("Execution Plan\n"
+            + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], "
+            + "isSortOnReceiver=[true])\n"
+            + "    LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n"
+            + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[1 DESC]],"
+            + " limit=[5])\n"
+            + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "            StreamingInstanceResponse\n"
+            + "              CombineGroupBy\n"
+            + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n" // <-- trimming happens here
+            + "                  Project(columns=[[i, j]])\n"
+            + "                    DocIdSet(maxDocs=[40000])\n"
+            + "                      
FilterMatchEntireSegment(numDocs=[4000])\n",
+        toExplainStr(postQuery(options + " SET explainAskingServers=true; 
EXPLAIN PLAN FOR " + query), true));
+  }
+
+  @Test
+  public void 
testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnAggregateIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(
+        conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER 
BY COUNT(*) DESC LIMIT 5"));
+
+    String options = "SET minSegmentGroupTrimSize=5; ";
+    String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, 
j, COUNT(*) "
+        + "FROM mytable GROUP BY i, j ORDER BY count(*) DESC LIMIT 5 ";
+
+    ResultSetGroup result = conn.execute(options + query);
+    assertTrimFlagSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
+        + "77,\t377,\t4\n"
+        + "66,\t566,\t4\n"
+        + "39,\t339,\t4\n"
+        + "96,\t396,\t4\n"
+        + "25,\t25,\t4", toResultStr(result));
+
+    assertEquals("Execution Plan\n"
+            + "LogicalSort(sort0=[$2], dir0=[DESC], offset=[0], fetch=[5])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], collation=[[2 
DESC]], isSortOnSender=[false], "
+            + "isSortOnReceiver=[true])\n"
+            + "    LogicalSort(sort0=[$2], dir0=[DESC], fetch=[5])\n"
+            + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[2 DESC]],"
+            + " limit=[5])\n"
+            + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "            StreamingInstanceResponse\n"
+            + "              CombineGroupBy\n"
+            + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n" //<-- trimming happens here
+            + "                  Project(columns=[[i, j]])\n"
+            + "                    DocIdSet(maxDocs=[40000])\n"
+            + "                      
FilterMatchEntireSegment(numDocs=[4000])\n",
+        toExplainStr(postQuery(options + " SET explainAskingServers=true; 
EXPLAIN PLAN FOR " + query), true));
+  }
+
+  @Test
+  public void 
testMSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable 
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+
+    String options = "SET minServerGroupTrimSize = 5; SET groupTrimThreshold = 
100; ";
+    String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, 
j, COUNT(*) "
+        + "FROM mytable "
+        + "GROUP BY i, j "
+        + "ORDER BY j DESC "
+        + "LIMIT 5 ";
+    ResultSetGroup result = conn.execute(options + query);
+    assertTrimFlagSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
+        + "99,\t999,\t4\n"
+        + "98,\t998,\t4\n"
+        + "97,\t997,\t4\n"
+        + "96,\t996,\t4\n"
+        + "95,\t995,\t4", toResultStr(result));
+
+    assertEquals("Execution Plan\n"
+            + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], "
+            + "isSortOnReceiver=[true])\n"
+            + "    LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n"
+            + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[1 DESC]],"
+            + " limit=[5])\n"
+            + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "            StreamingInstanceResponse\n"
+            + "              CombineGroupBy\n" // <-- trimming happens here
+            + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
+            + "                  Project(columns=[[i, j]])\n"
+            + "                    DocIdSet(maxDocs=[40000])\n"
+            + "                      
FilterMatchEntireSegment(numDocs=[4000])\n",
+        toExplainStr(postQuery(options + "SET explainAskingServers=true; 
EXPLAIN PLAN FOR " + query), true));
+  }
+
+  @Test
+  public void 
testMSQEGroupsTrimmedAtIntermediateLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable 
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+
+    // This case is tricky because intermediate results are hash-split among 
servers so one gets 50 rows on average.
+    // That's the reason both limit and trim size needs to be so small.
+    String query = "SELECT /*+ 
aggOptions(is_enable_group_trim='true',mse_min_group_trim_size='5') */ i, j, 
COUNT(*) "
+        + "FROM mytable "
+        + "GROUP BY i, j "
+        + "ORDER BY j DESC "
+        + "LIMIT 5 ";
+    ResultSetGroup result = conn.execute(query);
+    assertTrimFlagSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n"
+        + "99,\t999,\t4\n"
+        + "98,\t998,\t4\n"
+        + "97,\t997,\t4\n"
+        + "96,\t996,\t4\n"
+        + "95,\t995,\t4", toResultStr(result));
+
+    assertEquals(
+        "Execution Plan\n"
+            + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], "
+            + "isSortOnReceiver=[true])\n"
+            + "    LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n"
+            + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[1 DESC]],"
+            + " limit=[5])\n" // receives 50-row-big blocks, trimming kicks in 
only if limit is lower
+            + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n" // 
splits blocks via hash distribution
+            + "          LeafStageCombineOperator(table=[mytable])\n" // no 
trimming happens 'below'
+            + "            StreamingInstanceResponse\n"
+            + "              CombineGroupBy\n"
+            + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
+            + "                  Project(columns=[[i, j]])\n"
+            + "                    DocIdSet(maxDocs=[40000])\n"
+            + "                      
FilterMatchEntireSegment(numDocs=[4000])\n",
+        toExplainStr(postQuery(" set explainAskingServers=true; EXPLAIN PLAN 
FOR " + query), true));
+  }
+
+  // SSQE segment level
+  @Test
+  public void 
testSSQEFilteredGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, SUM(i) FILTER (WHERE i > 0) FROM mytable 
GROUP BY i, j ORDER BY i + j DESC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + 
query);
+    assertTrimFlagSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"sum(i) FILTER(WHERE i > 
'0')\"[\"DOUBLE\"]\n"
+        + "99,\t999,\t396.0\n"
+        + "98,\t998,\t392.0\n"
+        + "97,\t997,\t388.0\n"
+        + "96,\t996,\t384.0\n"
+        + "95,\t995,\t380.0", toResultStr(result));
+
+    assertEquals(
+        "BROKER_REDUCE(sort:[plus(i,j) 
DESC],limit:5,postAggregations:filter(sum(i),greater_than(i,'0'))),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY_FILTERED(groupKeys:i, j, 
aggregations:sum(i)),\t3,\t2\n" // <-- trimming happens here
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_FULL_SCAN(operator:RANGE,predicate:i > '0'),\t6,\t5\n"
+            + "PROJECT(i, j),\t7,\t3\n"
+            + "DOC_ID_SET,\t8,\t7\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t9,\t8\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
i + j DESC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + 
query);
+    assertTrimFlagSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+        + "99,\t999,\t4\n"
+        + "98,\t998,\t4\n"
+        + "97,\t997,\t4\n"
+        + "96,\t996,\t4\n"
+        + "95,\t995,\t4", toResultStr(result));
+
+    assertEquals("BROKER_REDUCE(sort:[plus(i,j) DESC],limit:5),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" 
//<-- trimming happens here
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtSegmentLevelWithOrderBySomeGroupByKeysIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
j DESC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + 
query);
+    assertTrimFlagSet(result);
+
+    // With test data set result is stable, but in general, trimming data 
ordered by subset of
+    // group by keys can produce incomplete group aggregates due to lack of 
stability.
+    // That is because, for a given value of j, sorting treats all values of i 
the same,
+    // and segment data is usually unordered.
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+        + "99,\t999,\t4\n"
+        + "98,\t998,\t4\n"
+        + "97,\t997,\t4\n"
+        + "96,\t996,\t4\n"
+        + "95,\t995,\t4", toResultStr(result));
+
+    assertEquals(
+        "BROKER_REDUCE(sort:[j DESC],limit:5),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" // 
<- trimming happens here
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAllGroupByKeysAndHavingIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+
+    // trimming is safe on rows ordered by all group by keys (regardless of 
key order, direction or duplications)
+    // but not when HAVING clause is present
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i 
> 50  ORDER BY i ASC, j ASC";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + 
query);
+    assertTrimFlagSet(result);
+
+    // Result is unexpectedly empty  because segment-level trim keeps first 50 
records ordered by i ASC, j ASC
+    // that are later filtered out at broker stage.
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]", 
toResultStr(result));
+
+    assertEquals("BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j 
ASC],limit:10),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" // 
<- trimming happens here
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAllGroupByKeysIsSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+
+    // trimming is safe on rows ordered by all group by keys (regardless of 
key order, direction or duplications)
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
j ASC, i DESC, j ASC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + 
query);
+    assertTrimFlagNotSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+        + "0,\t0,\t4\n"
+        + "1,\t1,\t4\n"
+        + "2,\t2,\t4\n"
+        + "3,\t3,\t4\n"
+        + "4,\t4,\t4", toResultStr(result));
+
+    assertEquals(
+        "BROKER_REDUCE(sort:[j ASC, i DESC],limit:5),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAggregateIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+
+    // trimming is safe on rows ordered by all group by keys (regardless of 
key order or direction)
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
count(*)*j ASC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + 
query);
+    assertTrimFlagSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+        + "0,\t0,\t4\n"
+        + "1,\t1,\t4\n"
+        + "2,\t2,\t4\n"
+        + "3,\t3,\t4\n"
+        + "4,\t4,\t4", toResultStr(result));
+
+    assertEquals("BROKER_REDUCE(sort:[times(count(*),j) 
ASC],limit:5,postAggregations:times(count(*),j)),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  // SSQE inter-segment level
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
i DESC  LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    // on server level, trimming occurs only when threshold is reached
+    ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET 
groupTrimThreshold = 50; " + query);
+    assertTrimFlagSet(result);
+    // result's order is not stable due to concurrent operations on indexed 
table
+
+    assertEquals("BROKER_REDUCE(sort:[i DESC],limit:5),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n" // <-- trimming happens here
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAllGroupByKeysIsSafe()
+      throws Exception {
+    // for SSQE server level == inter-segment level
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
i, j  LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    // on server level, trimming occurs only when threshold is reached
+    ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET 
groupTrimThreshold = 100; " + query);
+    assertTrimFlagNotSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+        + "0,\t0,\t4\n"
+        + "0,\t100,\t4\n"
+        + "0,\t200,\t4\n"
+        + "0,\t300,\t4\n"
+        + "0,\t400,\t4", toResultStr(result));
+
+    assertEquals(
+        "BROKER_REDUCE(sort:[i ASC, j ASC],limit:5),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAggregateIsNotSafe()
+      throws Exception {
+    // for SSQE server level == inter-segment level
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
count(*)*j DESC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    // on server level, trimming occurs only when threshold is reached
+    ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET 
groupTrimThreshold = 100; " + query);
+    assertTrimFlagSet(result);
+
+    // Result, though unstable due to concurrent operations on IndexedTable, 
is similar to the following
+    // (which is not correct):
+    //i[INT],j[LONG],count(*)[LONG]
+    //98,\t998,\t4
+    //94,\t994,\t4
+    //90,\t990,\t4
+    //86,\t986,\t4
+    //79,\t979,\t4
+
+    assertEquals("BROKER_REDUCE(sort:[times(count(*),j) 
DESC],limit:5,postAggregations:times(count(*),j)),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAllGroupByKeysAndHavingIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i 
> 50  ORDER BY i ASC, j ASC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    // on server level, trimming occurs only when threshold is reached
+    ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET 
groupTrimThreshold = 50; " + query);
+    assertTrimFlagSet(result);
+
+    // Result is unexpectedly empty  because inter-segment-level trim keeps 
first 25 records ordered by i ASC, j ASC
+    // that are later filtered out at broker stage.
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]", 
toResultStr(result));
+
+    assertEquals("BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j 
ASC],limit:5),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  // SSQE broker level
+
+  @Test
+  public void testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysIsSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
i, j LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    // on broker level, trimming occurs only when threshold is reached
+    ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET 
groupTrimThreshold = 50; " + query);
+    assertTrimFlagNotSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+        + "0,\t0,\t4\n"
+        + "0,\t100,\t4\n"
+        + "0,\t200,\t4\n"
+        + "0,\t300,\t4\n"
+        + "0,\t400,\t4", toResultStr(result));
+
+    assertEquals("BROKER_REDUCE(sort:[i ASC, j ASC],limit:5),\t1,\t0\n" //<-- 
trimming happens here
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtBrokerLevelOrderedBySomeGroupByKeysIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
j DESC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    ResultSetGroup result1 = conn.execute(query);
+    assertTrimFlagNotSet(result1);
+
+    // on broker level, trimming occurs only when threshold is reached
+    ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET 
groupTrimThreshold = 50; " + query);
+    assertTrimFlagSet(result);
+
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n"
+        + "99,\t999,\t4\n"
+        + "98,\t998,\t4\n"
+        + "97,\t997,\t4\n"
+        + "96,\t996,\t4\n"
+        + "95,\t995,\t4", toResultStr(result));
+
+    assertEquals("BROKER_REDUCE(sort:[j DESC],limit:5),\t1,\t0\n" //<-- 
trimming happens here
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysAndHavingIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i 
> 50  ORDER BY i ASC, j ASC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    ResultSetGroup result1 = conn.execute(query);
+    assertTrimFlagNotSet(result1);
+
+    // on broker level, trimming occurs only when threshold is reached
+    ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET 
groupTrimThreshold = 50; " + query);
+    assertTrimFlagSet(result);
+
+    // Result is unexpectedly empty  because segment-level trim keeps first 50 
records ordered by i ASC, j ASC
+    // that are later filtered out at broker stage.
+    assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]", 
toResultStr(result));
+
+    assertEquals(
+        "BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j 
ASC],limit:5),\t1,\t0\n" //<-- trimming happens here
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  @Test
+  public void 
testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByAggregateIsNotSafe()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
count(*)*j DESC LIMIT 5";
+
+    Connection conn = getPinotConnection();
+    assertTrimFlagNotSet(conn.execute(query));
+
+    // on broker level, trimming occurs only when threshold is reached
+    ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET 
groupTrimThreshold = 50; " + query);
+    assertTrimFlagSet(result);
+
+    // result is similar to the following, but unstable:
+    //i[INT],j[LONG],count(*)[LONG]
+    //99,999,4
+    //98,998,4
+    //97,997,4
+    //96,996,4
+    //82,982,4
+
+    assertEquals("BROKER_REDUCE(sort:[times(count(*),j) DESC],limit:5," //<-- 
trimming happens here
+            + "postAggregations:times(count(*),j)),\t1,\t0\n"
+            + "COMBINE_GROUP_BY,\t2,\t1\n"
+            + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n"
+            + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n"
+            + "PROJECT(i, j),\t4,\t3\n"
+            + "DOC_ID_SET,\t5,\t4\n"
+            + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n",
+        toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false));
+  }
+
+  private static void assertTrimFlagNotSet(ResultSetGroup result) {
+    
assertFalse(result.getBrokerResponse().getExecutionStats().isGroupsTrimmed());
+  }
+
+  private static void assertTrimFlagSet(ResultSetGroup result) {
+    
assertTrue(result.getBrokerResponse().getExecutionStats().isGroupsTrimmed());
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropOfflineTable(DEFAULT_TABLE_NAME);
+
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 16115fa728..f9fb427dd1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -229,6 +229,10 @@ public class AggregateOperator extends MultiStageOperator {
         return _eosBlock;
       } else {
         MseBlock dataBlock = new RowHeapDataBlock(rows, _resultSchema, 
_aggFunctions);
+        if (_groupByExecutor.getRowsProcessed() > _groupTrimSize) {
+          _statMap.merge(StatKey.GROUPS_TRIMMED, true);
+        }
+
         if (_groupByExecutor.isNumGroupsLimitReached()) {
           if (_errorOnNumGroupsLimit) {
             _input.earlyTerminate();
@@ -461,6 +465,7 @@ public class AggregateOperator extends MultiStageOperator {
         return true;
       }
     },
+    GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
     //@formatter:on
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 20c96fdcf1..fab26d033b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -240,6 +240,9 @@ public class LeafOperator extends MultiStageOperator {
           case TOTAL_DOCS:
             _statMap.merge(StatKey.TOTAL_DOCS, 
Long.parseLong(entry.getValue()));
             break;
+          case GROUPS_TRIMMED:
+            _statMap.merge(StatKey.GROUPS_TRIMMED, 
Boolean.parseBoolean(entry.getValue()));
+            break;
           case NUM_GROUPS_LIMIT_REACHED:
             _statMap.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, 
Boolean.parseBoolean(entry.getValue()));
             break;
@@ -654,6 +657,7 @@ public class LeafOperator extends MultiStageOperator {
     NUM_SEGMENTS_PRUNED_INVALID(StatMap.Type.INT),
     NUM_SEGMENTS_PRUNED_BY_LIMIT(StatMap.Type.INT),
     NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
+    GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
     NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
     NUM_RESIZES(StatMap.Type.INT, null),
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index d9a285d342..3b179ed4b8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -236,6 +236,7 @@ public abstract class MultiStageOperator
       public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) {
         @SuppressWarnings("unchecked")
         StatMap<AggregateOperator.StatKey> stats = 
(StatMap<AggregateOperator.StatKey>) map;
+        
response.mergeGroupsTrimmed(stats.getBoolean(AggregateOperator.StatKey.GROUPS_TRIMMED));
         
response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED));
         response.mergeNumGroupsWarningLimitReached(
             
stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED));
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
index 9b978c4c43..7eafbd858e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -60,6 +60,7 @@ public class MultistageGroupByExecutor {
   private final AggType _aggType;
   private final boolean _leafReturnFinalResult;
   private final DataSchema _resultSchema;
+  private int _rowsProcessed;
   private final int _numGroupsLimit;
   private final int _numGroupsWarningLimit;
   private final boolean _filteredAggregationsSkipEmptyGroups;
@@ -206,12 +207,14 @@ public class MultistageGroupByExecutor {
         _groupIdGenerator.getGroupKeyIterator(numKeys + numFunctions);
 
     int idx = 0;
-    while (idx++ < numGroups && groupKeyIterator.hasNext()) {
+    while (idx < numGroups && groupKeyIterator.hasNext()) {
       Object[] row = getRow(groupKeyIterator, numKeys, numFunctions, 
resultStoredTypes);
       sortedRows.add(row);
+      idx++;
     }
 
     while (groupKeyIterator.hasNext()) {
+      idx++;
       // TODO: allocate new array row only if row enters set
       Object[] row = getRow(groupKeyIterator, numKeys, numFunctions, 
resultStoredTypes);
       if (comparator.compare(sortedRows.peek(), row) < 0) {
@@ -220,6 +223,8 @@ public class MultistageGroupByExecutor {
       }
     }
 
+    _rowsProcessed = idx;
+
     int resultSize = sortedRows.size();
     ArrayList<Object[]> result = new ArrayList<>(sortedRows.size());
     for (int i = resultSize - 1; i >= 0; i--) {
@@ -245,9 +250,13 @@ public class MultistageGroupByExecutor {
         _groupIdGenerator.getGroupKeyIterator(numKeys + numFunctions);
 
     int idx = 0;
-    while (groupKeyIterator.hasNext() && idx++ < numGroups) {
+    while (groupKeyIterator.hasNext() && idx < numGroups) {
       Object[] row = getRow(groupKeyIterator, numKeys, numFunctions, 
resultStoredTypes);
       rows.add(row);
+      idx++;
+    }
+    if (groupKeyIterator.hasNext()) {
+      _rowsProcessed = idx + 1;
     }
     return rows;
   }
@@ -292,6 +301,10 @@ public class MultistageGroupByExecutor {
     return _groupIdGenerator.getNumGroups();
   }
 
+  public int getRowsProcessed() {
+    return _rowsProcessed;
+  }
+
   public boolean isNumGroupsLimitReached() {
     return _groupIdGenerator.getNumGroups() == _numGroupsLimit;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index 114752ae0e..3e928943d4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -62,6 +62,7 @@ public class DefaultRequestContext implements RequestScope {
   private long _realtimeTotalMemAllocatedBytes;
   private int _numServersQueried;
   private int _numServersResponded;
+  private boolean _groupsTrimmed;
   private boolean _isNumGroupsLimitReached;
   private int _numExceptions;
   private String _brokerId;
@@ -340,6 +341,11 @@ public class DefaultRequestContext implements RequestScope 
{
     return _realtimeThreadCpuTimeNs;
   }
 
+  @Override
+  public boolean isGroupsTrimmed() {
+    return _groupsTrimmed;
+  }
+
   @Override
   public boolean isNumGroupsLimitReached() {
     return _isNumGroupsLimitReached;
@@ -480,6 +486,11 @@ public class DefaultRequestContext implements RequestScope 
{
     _numServersResponded = numServersResponded;
   }
 
+  @Override
+  public void setGroupsTrimmed(boolean groupsTrimmed) {
+    _groupsTrimmed = groupsTrimmed;
+  }
+
   @Override
   public void setNumGroupsLimitReached(boolean numGroupsLimitReached) {
     _isNumGroupsLimitReached = numGroupsLimitReached;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index e49f3e769f..fda3e4d4b5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -144,6 +144,8 @@ public interface RequestContext {
   long getRealtimeTotalMemAllocatedBytes();
   void setRealtimeTotalMemAllocatedBytes(long realtimeTotalMemAllocatedBytes);
 
+  boolean isGroupsTrimmed();
+
   boolean isNumGroupsLimitReached();
 
   int getNumExceptions();
@@ -176,6 +178,8 @@ public interface RequestContext {
 
   void setNumServersResponded(int numServersResponded);
 
+  void setGroupsTrimmed(boolean groupsTrimmed);
+
   void setNumGroupsLimitReached(boolean numGroupsLimitReached);
 
   void setNumExceptions(int numExceptions);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to