This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 767e3cddd6 Set groupsTrimmed result flag when data is trimmed. (#16220) 767e3cddd6 is described below commit 767e3cddd6472bb12307af3e78f82d471be1559d Author: Bolek Ziobrowski <26925920+bziobrow...@users.noreply.github.com> AuthorDate: Wed Jul 2 14:19:16 2025 +0200 Set groupsTrimmed result flag when data is trimmed. (#16220) --- .../apache/pinot/broker/querylog/QueryLogger.java | 6 + .../requesthandler/BaseBrokerRequestHandler.java | 1 + .../BaseSingleStageBrokerRequestHandler.java | 5 +- .../MultiStageBrokerRequestHandler.java | 6 + .../pinot/broker/querylog/QueryLoggerTest.java | 1 + .../org/apache/pinot/client/ExecutionStats.java | 6 + .../apache/pinot/client/ExecutionStatsTest.java | 11 +- .../apache/pinot/common/datatable/DataTable.java | 5 +- .../apache/pinot/common/metrics/BrokerMeter.java | 4 + .../apache/pinot/common/metrics/ServerMeter.java | 6 + .../pinot/common/response/BrokerResponse.java | 5 + .../response/broker/BrokerResponseNative.java | 12 +- .../response/broker/BrokerResponseNativeV2.java | 12 +- .../response/broker/CursorResponseNative.java | 2 + .../apache/pinot/core/data/table/IndexedTable.java | 7 + .../blocks/results/GroupByResultsBlock.java | 12 + .../operator/combine/GroupByCombineOperator.java | 9 + .../operator/query/FilteredGroupByOperator.java | 4 + .../pinot/core/operator/query/GroupByOperator.java | 4 + .../query/reduce/ExecutionStatsAggregator.java | 3 + .../core/query/reduce/GroupByDataTableReducer.java | 4 + .../core/query/request/context/QueryContext.java | 47 ++ ...erSegmentAggregationSingleValueQueriesTest.java | 30 + .../GroupByEnableTrimOptionIntegrationTest.java | 2 +- .../tests/GroupByOptionsIntegrationTest.java | 39 +- .../tests/GroupByTrimmingIntegrationTest.java | 789 +++++++++++++++++++++ .../query/runtime/operator/AggregateOperator.java | 5 + .../pinot/query/runtime/operator/LeafOperator.java | 4 + .../query/runtime/operator/MultiStageOperator.java | 1 + .../operator/MultistageGroupByExecutor.java | 17 +- .../pinot/spi/trace/DefaultRequestContext.java | 11 + .../org/apache/pinot/spi/trace/RequestContext.java | 4 + 32 files changed, 1060 insertions(+), 14 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java index 5edcc24570..61359e2b3f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java @@ -245,6 +245,12 @@ public class QueryLogger { .append(params._response.getNumServersQueried()); } }, + GROUPS_TRIMMED("groupsTrimmed") { + @Override + void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) { + builder.append(params._response.isGroupsTrimmed()); + } + }, GROUP_LIMIT_REACHED("groupLimitReached") { @Override void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index e990fc3945..895102dc61 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -302,6 +302,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } statistics.setProcessingExceptions(processingExceptions); statistics.setNumExceptions(numExceptions); + statistics.setGroupsTrimmed(response.isGroupsTrimmed()); statistics.setNumGroupsLimitReached(response.isNumGroupsLimitReached()); statistics.setProcessingTimeMillis(response.getTimeUsedMs()); statistics.setNumDocsScanned(response.getNumDocsScanned()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index e3184f83d0..b30a3f7a9a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -811,6 +811,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED, 1); } + if (brokerResponse.isGroupsTrimmed()) { + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1); + } // server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned // this is an attempt to return more faithful information based on other sources @@ -1904,7 +1907,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ * TODO: come up with other criteria for forcing a log and come up with better numbers */ private boolean forceLog(BrokerResponse brokerResponse, long totalTimeMs) { - if (brokerResponse.isNumGroupsLimitReached()) { + if (brokerResponse.isNumGroupsLimitReached() || brokerResponse.isGroupsTrimmed()) { return true; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 78513ef01c..81bfe277d8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -595,6 +595,12 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } } + if (brokerResponse.isGroupsTrimmed()) { + for (String table : tableNames) { + _brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1); + } + } + // Set total query processing time // TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis(); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java index 141ff3949e..456b99ec8a 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java @@ -104,6 +104,7 @@ public class QueryLoggerTest { + ":5/6/7/8/9/10/21," + "consumingFreshnessTimeMs=11," + "servers=12/13," + + "groupsTrimmed=false," + "groupLimitReached=false," + "groupWarningLimitReached=false," + "brokerReduceTimeMs=20," diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java index 61b0029714..b2d2ef7919 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java @@ -43,6 +43,7 @@ public class ExecutionStats { private static final String NUM_CONSUMING_SEGMENTS_QUERIED = "numConsumingSegmentsQueried"; private static final String MIN_CONSUMING_FRESHNESS_TIME_MS = "minConsumingFreshnessTimeMs"; private static final String TOTAL_DOCS = "totalDocs"; + private static final String GROUPS_TRIMMED = "groupsTrimmed"; private static final String NUM_GROUPS_LIMIT_REACHED = "numGroupsLimitReached"; private static final String NUM_GROUPS_WARNING_LIMIT_REACHED = "numGroupsWarningLimitReached"; private static final String BROKER_REDUCE_TIME_MS = "brokerReduceTimeMs"; @@ -112,6 +113,10 @@ public class ExecutionStats { return _brokerResponse.has(NUM_GROUPS_LIMIT_REACHED) && _brokerResponse.get(NUM_GROUPS_LIMIT_REACHED).asBoolean(); } + public boolean isGroupsTrimmed() { + return _brokerResponse.has(GROUPS_TRIMMED) && _brokerResponse.get(GROUPS_TRIMMED).asBoolean(); + } + public boolean isNumGroupsWarningLimitReached() { return _brokerResponse.has(NUM_GROUPS_WARNING_LIMIT_REACHED) && _brokerResponse.get(NUM_GROUPS_WARNING_LIMIT_REACHED).asBoolean(); @@ -143,6 +148,7 @@ public class ExecutionStats { map.put(NUM_CONSUMING_SEGMENTS_QUERIED, getNumConsumingSegmentsQueried()); map.put(MIN_CONSUMING_FRESHNESS_TIME_MS, getMinConsumingFreshnessTimeMs() + "ms"); map.put(TOTAL_DOCS, getTotalDocs()); + map.put(GROUPS_TRIMMED, isGroupsTrimmed()); map.put(NUM_GROUPS_LIMIT_REACHED, isNumGroupsLimitReached()); map.put(NUM_GROUPS_WARNING_LIMIT_REACHED, isNumGroupsWarningLimitReached()); map.put(BROKER_REDUCE_TIME_MS, getBrokerReduceTimeMs() + "ms"); diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java index 583a28e5ff..3757566c83 100644 --- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java +++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java @@ -43,7 +43,7 @@ public class ExecutionStatsTest { + "\"numEntriesScannedInFilter\":10, \"numEntriesScannedPostFilter\":10, \"numSegmentsQueried\":10, " + "\"numSegmentsProcessed\":10, \"numSegmentsMatched\":10, \"numConsumingSegmentsQueried\":10, " + "\"minConsumingFreshnessTimeMs\":10, \"totalDocs\":10, \"numGroupsLimitReached\":true, " - + "\"timeUsedMs\":10}"; + + "\"timeUsedMs\":10, \"groupsTrimmed\": true}"; _mockBrokerResponse = JsonUtils.stringToJsonNode(json); _executionStatsUnderTest = new ExecutionStats(_mockBrokerResponse); } @@ -156,6 +156,15 @@ public class ExecutionStatsTest { assertTrue(result); } + @Test + public void testIsGroupsTrimmed() { + // Run the test + final boolean result = _executionStatsUnderTest.isGroupsTrimmed(); + + // Verify the results + assertTrue(result); + } + @Test public void testGetTimeUsedMs() { // Run the test diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java index bd39b76fd9..9f4040084f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java @@ -146,11 +146,12 @@ public interface DataTable { THREAD_MEM_ALLOCATED_BYTES(36, "threadMemAllocatedBytes", MetadataValueType.LONG), RESPONSE_SER_MEM_ALLOCATED_BYTES(37, "responseSerMemAllocatedBytes", MetadataValueType.LONG), // NOTE: for server after release 1.3.0 this flag is always set to true since servers now perform sorting - SORTED(38, "sorted", MetadataValueType.BOOLEAN); + SORTED(38, "sorted", MetadataValueType.BOOLEAN), + GROUPS_TRIMMED(39, "groupsTrimmed", MetadataValueType.STRING); // We keep this constant to track the max id added so far for backward compatibility. // Increase it when adding new keys, but NEVER DECREASE IT!!! - private static final int MAX_ID = 38; + private static final int MAX_ID = GROUPS_TRIMMED.getId(); private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new MetadataKey[MAX_ID + 1]; private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 9c0010b474..a87784a554 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -152,6 +152,10 @@ public class BrokerMeter implements AbstractMetrics.Meter { public static final BrokerMeter SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS = create( "SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS", "badResponses", false); + // This metric track the number of broker responses with trimmed groups (potential bad responses). + public static final BrokerMeter BROKER_RESPONSES_WITH_GROUPS_TRIMMED = create( + "BROKER_RESPONSES_WITH_GROUPS_TRIMMED", "badResponses", false); + // This metric track the number of broker responses with number of groups limit reached (potential bad responses). public static final BrokerMeter BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED = create( "BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED", "badResponses", false); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 1c56532b55..8a73029ac2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -150,6 +150,12 @@ public enum ServerMeter implements AbstractMetrics.Meter { * But if a single query has 2 different join operators and each one reaches the limit, this will be increased by 2. */ HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true), + /** + * Number of times group by results were trimmed. + * It is increased in one by each worker that reaches the limit within the stage. + * That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 10. + */ + AGGREGATE_TIMES_GROUPS_TRIMMED("times", true), /** * Number of times the max number of groups has been reached. * It is increased in one by each worker that reaches the limit within the stage. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index dfecbdf808..0672b7d847 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -126,6 +126,11 @@ public interface BrokerResponse { return getExceptions().size(); } + /** + * Returns whether groups were trimmed (reduced in size after sorting). + */ + boolean isGroupsTrimmed(); + /** * Returns whether the number of groups limit has been reached. */ diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index 60a035ead6..23cbbdcf40 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -56,7 +56,7 @@ import org.apache.pinot.spi.utils.JsonUtils; "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried", "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes", "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes", - "pools", "rlsFiltersApplied" + "pools", "rlsFiltersApplied", "groupsTrimmed" }) @JsonIgnoreProperties(ignoreUnknown = true) public class BrokerResponseNative implements BrokerResponse { @@ -72,6 +72,7 @@ public class BrokerResponseNative implements BrokerResponse { private ResultTable _resultTable; private int _numRowsResultSet = 0; private List<QueryProcessingException> _exceptions = new ArrayList<>(); + private boolean _groupsTrimmed = false; private boolean _numGroupsLimitReached = false; private boolean _numGroupsWarningLimitReached = false; private long _timeUsedMs = 0L; @@ -205,6 +206,15 @@ public class BrokerResponseNative implements BrokerResponse { _exceptions.add(exception); } + @Override + public boolean isGroupsTrimmed() { + return _groupsTrimmed; + } + + public void setGroupsTrimmed(boolean groupsTrimmed) { + _groupsTrimmed = groupsTrimmed; + } + @Override public boolean isNumGroupsLimitReached() { return _numGroupsLimitReached; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index 0926a811c0..d8e10f3745 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -51,7 +51,7 @@ import org.apache.pinot.common.response.ProcessingException; "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried", "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes", "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes", - "pools", "rlsFiltersApplied" + "pools", "rlsFiltersApplied", "groupsTrimmed" }) public class BrokerResponseNativeV2 implements BrokerResponse { private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class); @@ -126,11 +126,20 @@ public class BrokerResponseNativeV2 implements BrokerResponse { addException(new QueryProcessingException(exception.getErrorCode(), exception.getMessage())); } + @Override + public boolean isGroupsTrimmed() { + return _brokerStats.getBoolean(StatKey.GROUPS_TRIMMED); + } + @Override public boolean isNumGroupsLimitReached() { return _brokerStats.getBoolean(StatKey.NUM_GROUPS_LIMIT_REACHED); } + public void mergeGroupsTrimmed(boolean groupsTrimmed) { + _brokerStats.merge(StatKey.GROUPS_TRIMMED, groupsTrimmed); + } + public void mergeNumGroupsLimitReached(boolean numGroupsLimitReached) { _brokerStats.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, numGroupsLimitReached); } @@ -444,6 +453,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse { NUM_SEGMENTS_PRUNED_INVALID(StatMap.Type.INT), NUM_SEGMENTS_PRUNED_BY_LIMIT(StatMap.Type.INT), NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT), + GROUPS_TRIMMED(StatMap.Type.BOOLEAN), NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN), NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java index 5a5d2516e8..e1ba761767 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java @@ -34,6 +34,7 @@ import org.apache.pinot.common.response.CursorResponse; "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tableQueries", + "groupsTrimmed", // Fields specific to CursorResponse "offset", "numRows", "cursorResultWriteTimeMs", "cursorFetchTimeMs", "submissionTimeMs", "expirationTimeMs", "brokerHost", "brokerPort", "bytesWritten" @@ -57,6 +58,7 @@ public class CursorResponseNative extends BrokerResponseNative implements Cursor setResultTable(response.getResultTable()); setNumRowsResultSet(response.getNumRowsResultSet()); setExceptions(response.getExceptions()); + setGroupsTrimmed(response.isGroupsTrimmed()); setNumGroupsLimitReached(response.isNumGroupsLimitReached()); setNumGroupsWarningLimitReached(response.isNumGroupsWarningLimitReached()); setTimeUsedMs(response.getTimeUsedMs()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java index d96854ccb5..9ce27567d0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java @@ -270,6 +270,13 @@ public abstract class IndexedTable extends BaseTable { return _numResizes; } + public boolean isTrimmed() { + // single resize occurs on finish() if there's orderBy + // all other re-sizes are triggered by trim size and threshold + int min = _topRecords != null && _hasOrderBy ? 1 : 0; + return _numResizes > min; + } + public long getResizeTimeMs() { return TimeUnit.NANOSECONDS.toMillis(_resizeTimeNs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java index 425bb2052c..48ff191c2d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java @@ -61,6 +61,7 @@ public class GroupByResultsBlock extends BaseResultsBlock { private final Table _table; private final QueryContext _queryContext; + private boolean _groupsTrimmed; private boolean _numGroupsLimitReached; private boolean _numGroupsWarningLimitReached; private int _numResizes; @@ -142,6 +143,14 @@ public class GroupByResultsBlock extends BaseResultsBlock { _numGroupsLimitReached = numGroupsLimitReached; } + public boolean isGroupsTrimmed() { + return _groupsTrimmed; + } + + public void setGroupsTrimmed(boolean groupsTrimmed) { + _groupsTrimmed = groupsTrimmed; + } + public boolean isNumGroupsWarningLimitReached() { return _numGroupsWarningLimitReached; } @@ -336,6 +345,9 @@ public class GroupByResultsBlock extends BaseResultsBlock { @Override public Map<String, String> getResultsMetadata() { Map<String, String> metadata = super.getResultsMetadata(); + if (_groupsTrimmed) { + metadata.put(MetadataKey.GROUPS_TRIMMED.getName(), "true"); + } if (_numGroupsLimitReached) { metadata.put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true"); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index fb34278b12..18e67b9899 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -66,6 +66,7 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group private final CountDownLatch _operatorLatch; private volatile IndexedTable _indexedTable; + private volatile boolean _groupsTrimmed; private volatile boolean _numGroupsLimitReached; private volatile boolean _numGroupsWarningLimitReached; @@ -120,6 +121,9 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group } } + if (resultsBlock.isGroupsTrimmed()) { + _groupsTrimmed = true; + } // Set groups limit reached flag. if (resultsBlock.isNumGroupsLimitReached()) { _numGroupsLimitReached = true; @@ -222,6 +226,10 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group return new ExceptionResultsBlock(errMsg); } + if (_indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) { + _groupsTrimmed = true; + } + IndexedTable indexedTable = _indexedTable; if (_queryContext.isServerReturnFinalResult()) { indexedTable.finish(true, true); @@ -231,6 +239,7 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group indexedTable.finish(false); } GroupByResultsBlock mergedBlock = new GroupByResultsBlock(indexedTable, _queryContext); + mergedBlock.setGroupsTrimmed(_groupsTrimmed); mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached); mergedBlock.setNumGroupsWarningLimitReached(_numGroupsWarningLimitReached); mergedBlock.setNumResizes(indexedTable.getNumResizes()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java index c17e64d8de..1afda13ca6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java @@ -195,7 +195,11 @@ public class FilteredGroupByOperator extends BaseOperator<GroupByResultsBlock> { TableResizer tableResizer = new TableResizer(_dataSchema, _queryContext); Collection<IntermediateRecord> intermediateRecords = tableResizer.trimInSegmentResults(groupKeyGenerator, groupByResultHolders, trimSize); + + ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED, 1); + boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag only if it's not safe GroupByResultsBlock resultsBlock = new GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext); + resultsBlock.setGroupsTrimmed(unsafeTrim); resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached); resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached); return resultsBlock; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java index 33072d5fd4..d28de96107 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java @@ -143,7 +143,11 @@ public class GroupByOperator extends BaseOperator<GroupByResultsBlock> { if (groupByExecutor.getNumGroups() > trimSize) { TableResizer tableResizer = new TableResizer(_dataSchema, _queryContext); Collection<IntermediateRecord> intermediateRecords = groupByExecutor.trimGroupByResult(trimSize, tableResizer); + + ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED, 1); + boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag only if it's not safe GroupByResultsBlock resultsBlock = new GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext); + resultsBlock.setGroupsTrimmed(unsafeTrim); resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached); resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached); return resultsBlock; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java index 02f08661e9..bb49e0543d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java @@ -70,6 +70,7 @@ public class ExecutionStatsAggregator { private long _numSegmentsPrunedByValue = 0L; private long _explainPlanNumEmptyFilterSegments = 0L; private long _explainPlanNumMatchAllFilterSegments = 0L; + private boolean _groupsTrimmed = false; private boolean _numGroupsLimitReached = false; private boolean _numGroupsWarningLimitReached = false; @@ -228,6 +229,7 @@ public class ExecutionStatsAggregator { _numTotalDocs += Long.parseLong(numTotalDocsString); } + _groupsTrimmed |= Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.GROUPS_TRIMMED.getName())); _numGroupsLimitReached |= Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName())); _numGroupsWarningLimitReached |= @@ -252,6 +254,7 @@ public class ExecutionStatsAggregator { brokerResponseNative.setNumSegmentsProcessed(_numSegmentsProcessed); brokerResponseNative.setNumSegmentsMatched(_numSegmentsMatched); brokerResponseNative.setTotalDocs(_numTotalDocs); + brokerResponseNative.setGroupsTrimmed(_groupsTrimmed); brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached); brokerResponseNative.setNumGroupsWarningLimitReached(_numGroupsWarningLimitReached); brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 2072e937de..c5647c1b27 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -145,6 +145,10 @@ public class GroupByDataTableReducer implements DataTableReducer { throws TimeoutException { // NOTE: This step will modify the data schema and also return final aggregate results. IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext); + if (indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) { + brokerResponseNative.setGroupsTrimmed(true); + } + if (brokerMetrics != null) { brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes()); brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index 484868f7ce..6c8076091e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.query.request.context; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -164,6 +165,48 @@ public class QueryContext { _explain = explain; } + private boolean isSameOrderAndGroupByColumns(QueryContext context) { + List<ExpressionContext> groupByKeys = context.getGroupByExpressions(); + List<OrderByExpressionContext> orderByKeys = context.getOrderByExpressions(); + + if (groupByKeys == null || groupByKeys.isEmpty()) { + return orderByKeys == null || orderByKeys.isEmpty(); + } else if (orderByKeys == null || orderByKeys.isEmpty()) { + return false; + } + + BitSet orderByKeysMatched = new BitSet(orderByKeys.size()); + + OUTER_GROUP: + for (ExpressionContext groupExp : groupByKeys) { + for (int i = 0; i < orderByKeys.size(); i++) { + OrderByExpressionContext orderExp = orderByKeys.get(i); + if (groupExp.equals(orderExp.getExpression())) { + orderByKeysMatched.set(i); + continue OUTER_GROUP; + } + } + + return false; + } + + OUTER_ORDER: + for (int i = 0, n = orderByKeys.size(); i < n; i++) { + if (orderByKeysMatched.get(i)) { + continue; + } + + for (ExpressionContext groupExp : groupByKeys) { + if (groupExp.equals(orderByKeys.get(i).getExpression())) { + continue OUTER_ORDER; + } + } + return false; + } + + return true; + } + /** * Returns the table name. * NOTE: on the broker side, table name might be {@code null} when subquery is available. @@ -520,6 +563,10 @@ public class QueryContext { return isIndexUseAllowed(dataSource.getColumnName(), indexType); } + public boolean isUnsafeTrim() { + return !isSameOrderAndGroupByColumns(this) || getHavingFilter() != null; + } + public static class Builder { private String _tableName; private QueryContext _subquery; diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java index 6fcb909374..b5fd08dda4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java @@ -774,6 +774,36 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal assertTrue(brokerResponse.isNumGroupsLimitReached()); } + @Test + public void testGroupsTrimmedAtSegmentLevel() { + String query = "SELECT COUNT(*) FROM testTable GROUP BY column1, column3 ORDER BY column1"; + + BrokerResponseNative brokerResponse = getBrokerResponse(query); + assertFalse(brokerResponse.isGroupsTrimmed()); + + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(); + planMaker.setMinSegmentGroupTrimSize(5); + brokerResponse = getBrokerResponse(query, planMaker); + + assertTrue(brokerResponse.isGroupsTrimmed()); + } + + @Test + public void testGroupsTrimmedAtServerLevel() { + String query = "SELECT COUNT(*) FROM testTable GROUP BY column1, column3 ORDER BY column1"; + + BrokerResponseNative brokerResponse = getBrokerResponse(query); + assertFalse(brokerResponse.isGroupsTrimmed()); + + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(); + planMaker.setMinServerGroupTrimSize(5); + // on server level, trimming occurs only when threshold is reached + planMaker.setGroupByTrimThreshold(100); + brokerResponse = getBrokerResponse(query, planMaker); + + assertTrue(brokerResponse.isGroupsTrimmed()); + } + @Test public void testDistinctSum() { String query = "select DISTINCTSUM(column1) as v1, DISTINCTSUM(column3) as v2 from testTable"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java index 947c2032e0..4f534e458c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java @@ -207,7 +207,7 @@ public class GroupByEnableTrimOptionIntegrationTest extends BaseClusterIntegrati JsonNode plan = postV2Query(option + " set explainAskingServers=true; explain plan for " + query); Assert.assertEquals(GroupByOptionsIntegrationTest.toResultStr(result), expectedResult); - Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan), expectedPlan); + Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan, true), expectedPlan); } private JsonNode postV2Query(String query) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java index 272f314482..e2251cece3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.commons.io.FileUtils; +import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; @@ -473,7 +474,7 @@ public class GroupByOptionsIntegrationTest extends BaseClusterIntegrationTestSet JsonNode plan = postV2Query(option + " set explainAskingServers=true; explain plan for " + query); Assert.assertEquals(toResultStr(result), expectedResult); - Assert.assertEquals(toExplainStr(plan), expectedPlan); + Assert.assertEquals(toExplainStr(plan, true), expectedPlan); } @Test @@ -550,6 +551,17 @@ public class GroupByOptionsIntegrationTest extends BaseClusterIntegrationTestSet getExtraQueryProperties()); } + public static @NotNull String toResultStr(ResultSetGroup resultSet) { + if (resultSet == null) { + return "null"; + } + JsonNode node = resultSet.getBrokerResponse().getResultTable(); + if (node == null) { + return toErrorString(resultSet.getBrokerResponse().getExceptions()); + } + return toString(node); + } + public static @NotNull String toResultStr(JsonNode mainNode) { if (mainNode == null) { return "null"; @@ -561,7 +573,7 @@ public class GroupByOptionsIntegrationTest extends BaseClusterIntegrationTestSet return toString(node); } - static @NotNull String toExplainStr(JsonNode mainNode) { + static @NotNull String toExplainStr(JsonNode mainNode, boolean isMSQE) { if (mainNode == null) { return "null"; } @@ -569,7 +581,11 @@ public class GroupByOptionsIntegrationTest extends BaseClusterIntegrationTestSet if (node == null) { return toErrorString(mainNode.get("exceptions")); } - return toExplainString(node); + return toExplainString(node, isMSQE); + } + + static @NotNull String toExplainStr(JsonNode mainNode) { + return toExplainStr(mainNode, false); } public static String toErrorString(JsonNode node) { @@ -613,8 +629,21 @@ public class GroupByOptionsIntegrationTest extends BaseClusterIntegrationTestSet return buf.toString(); } - public static String toExplainString(JsonNode node) { - return node.get("rows").get(0).get(1).textValue(); + private static String toExplainString(JsonNode node, boolean isMSQE) { + if (isMSQE) { + return node.get("rows").get(0).get(1).textValue(); + } else { + StringBuilder result = new StringBuilder(); + JsonNode rows = node.get("rows"); + for (int i = 0, n = rows.size(); i < n; i++) { + JsonNode row = rows.get(i); + result.append(row.get(0).textValue()) + .append(",\t").append(row.get(1).intValue()) + .append(",\t").append(row.get(2).intValue()) + .append('\n'); + } + return result.toString(); + } } @AfterClass diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java new file mode 100644 index 0000000000..51f42729a0 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java @@ -0,0 +1,789 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.client.Connection; +import org.apache.pinot.client.ResultSetGroup; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toExplainStr; +import static org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toResultStr; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; + + +// Tests that 'groupsTrimmed' flag is set when results trimming occurs at: +// SSQE - segment, inter-segment/server and broker levels +// MSQE - segment, inter-segment and intermediate levels +// Note: MSQE doesn't push collations depending on group by result into aggregation nodes +// so e.g. ORDER BY i*j doesn't trigger trimming even when hints are set +public class GroupByTrimmingIntegrationTest extends BaseClusterIntegrationTestSet { + + static final int FILES_NO = 4; + static final int RECORDS_NO = 1000; + static final String I_COL = "i"; + static final String J_COL = "j"; + static final int SERVERS_NO = 2; + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + startZk(); + startController(); + startServers(SERVERS_NO); + startBroker(); + + Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME) + .addSingleValueDimension(I_COL, FieldSpec.DataType.INT) + .addSingleValueDimension(J_COL, FieldSpec.DataType.LONG) + .build(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + List<File> avroFiles = createAvroFile(_tempDir); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(DEFAULT_TABLE_NAME, _tarDir); + + // Wait for all documents loaded + TestUtils.waitForCondition(() -> getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L, + 60_000, + "Failed to load documents", true, Duration.ofMillis(60_000 / 10)); + + setUseMultiStageQueryEngine(true); + + Map<String, List<String>> map = getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE); + + // make sure segments are split between multiple servers + assertEquals(map.size(), SERVERS_NO); + } + + protected TableConfig createOfflineTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName(getTableName()) + .setNumReplicas(getNumReplicas()) + .setBrokerTenant(getBrokerTenant()) + .build(); + } + + public static List<File> createAvroFile(File tempDir) + throws IOException { + + // create avro schema + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + avroSchema.setFields(ImmutableList.of( + new org.apache.avro.Schema.Field(I_COL, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null, null), + new org.apache.avro.Schema.Field(J_COL, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, null))); + + List<File> files = new ArrayList<>(); + for (int file = 0; file < FILES_NO; file++) { + File avroFile = new File(tempDir, "data_" + file + ".avro"); + try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + fileWriter.create(avroSchema, avroFile); + + for (int docId = 0; docId < RECORDS_NO; docId++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(I_COL, docId % 100); + record.put(J_COL, docId); + fileWriter.append(record); + } + files.add(avroFile); + } + } + return files; + } + + // MSQE - multi stage query engine + @Test + public void testMSQEOrderByOnDependingOnAggregateResultIsNotPushedDown() + throws Exception { + setUseMultiStageQueryEngine(true); + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY i*j DESC LIMIT 5")); + + String options = "SET minSegmentGroupTrimSize=5; "; + String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, j, COUNT(*) " + + "FROM mytable GROUP BY i, j ORDER BY i*j DESC LIMIT 5 "; + + ResultSetGroup result = conn.execute(options + query); + assertTrimFlagNotSet(result); + + assertEquals("Execution Plan\n" + + "LogicalSort(sort0=[$3], dir0=[DESC], offset=[0], fetch=[5])\n" + + " PinotLogicalSortExchange(distribution=[hash], collation=[[3 DESC]], isSortOnSender=[false], " + + "isSortOnReceiver=[true])\n" + + " LogicalSort(sort0=[$3], dir0=[DESC], fetch=[5])\n" // <-- actual sort & limit + + " LogicalProject(i=[$0], j=[$1], EXPR$2=[$2], EXPR$3=[*($0, $1)])\n" + // <-- order by value is computed here, so trimming in upstream stages is not possible + + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL])\n" + + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" + + " GroupBy(groupKeys=[[i, j]], aggregations=[[count(*)]])\n" + + " Project(columns=[[i, j]])\n" + + " DocIdSet(maxDocs=[40000])\n" + + " FilterMatchEntireSegment(numDocs=[4000])\n", + toExplainStr(postQuery(options + " SET explainAskingServers=true; EXPLAIN PLAN FOR " + query), true)); + } + + @Test + public void testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(true); + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5")); + + String options = "SET minSegmentGroupTrimSize=5; "; + String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, j, COUNT(*) " + + "FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5 "; + + ResultSetGroup result = conn.execute(options + query); + assertTrimFlagSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n" + + "99,\t999,\t4\n" + + "98,\t998,\t4\n" + + "97,\t997,\t4\n" + + "96,\t996,\t4\n" + + "95,\t995,\t4", toResultStr(result)); + + assertEquals("Execution Plan\n" + + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n" + + " PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], " + + "isSortOnReceiver=[true])\n" + + " LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n" + + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL], collations=[[1 DESC]]," + + " limit=[5])\n" + + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" + + " GroupBy(groupKeys=[[i, j]], aggregations=[[count(*)]])\n" // <-- trimming happens here + + " Project(columns=[[i, j]])\n" + + " DocIdSet(maxDocs=[40000])\n" + + " FilterMatchEntireSegment(numDocs=[4000])\n", + toExplainStr(postQuery(options + " SET explainAskingServers=true; EXPLAIN PLAN FOR " + query), true)); + } + + @Test + public void testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnAggregateIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(true); + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet( + conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY COUNT(*) DESC LIMIT 5")); + + String options = "SET minSegmentGroupTrimSize=5; "; + String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, j, COUNT(*) " + + "FROM mytable GROUP BY i, j ORDER BY count(*) DESC LIMIT 5 "; + + ResultSetGroup result = conn.execute(options + query); + assertTrimFlagSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n" + + "77,\t377,\t4\n" + + "66,\t566,\t4\n" + + "39,\t339,\t4\n" + + "96,\t396,\t4\n" + + "25,\t25,\t4", toResultStr(result)); + + assertEquals("Execution Plan\n" + + "LogicalSort(sort0=[$2], dir0=[DESC], offset=[0], fetch=[5])\n" + + " PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC]], isSortOnSender=[false], " + + "isSortOnReceiver=[true])\n" + + " LogicalSort(sort0=[$2], dir0=[DESC], fetch=[5])\n" + + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL], collations=[[2 DESC]]," + + " limit=[5])\n" + + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" + + " GroupBy(groupKeys=[[i, j]], aggregations=[[count(*)]])\n" //<-- trimming happens here + + " Project(columns=[[i, j]])\n" + + " DocIdSet(maxDocs=[40000])\n" + + " FilterMatchEntireSegment(numDocs=[4000])\n", + toExplainStr(postQuery(options + " SET explainAskingServers=true; EXPLAIN PLAN FOR " + query), true)); + } + + @Test + public void testMSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(true); + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5")); + + String options = "SET minServerGroupTrimSize = 5; SET groupTrimThreshold = 100; "; + String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, j, COUNT(*) " + + "FROM mytable " + + "GROUP BY i, j " + + "ORDER BY j DESC " + + "LIMIT 5 "; + ResultSetGroup result = conn.execute(options + query); + assertTrimFlagSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n" + + "99,\t999,\t4\n" + + "98,\t998,\t4\n" + + "97,\t997,\t4\n" + + "96,\t996,\t4\n" + + "95,\t995,\t4", toResultStr(result)); + + assertEquals("Execution Plan\n" + + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n" + + " PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], " + + "isSortOnReceiver=[true])\n" + + " LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n" + + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL], collations=[[1 DESC]]," + + " limit=[5])\n" + + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" // <-- trimming happens here + + " GroupBy(groupKeys=[[i, j]], aggregations=[[count(*)]])\n" + + " Project(columns=[[i, j]])\n" + + " DocIdSet(maxDocs=[40000])\n" + + " FilterMatchEntireSegment(numDocs=[4000])\n", + toExplainStr(postQuery(options + "SET explainAskingServers=true; EXPLAIN PLAN FOR " + query), true)); + } + + @Test + public void testMSQEGroupsTrimmedAtIntermediateLevelWithOrderByOnSomeGroupByKeysIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(true); + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5")); + + // This case is tricky because intermediate results are hash-split among servers so one gets 50 rows on average. + // That's the reason both limit and trim size needs to be so small. + String query = "SELECT /*+ aggOptions(is_enable_group_trim='true',mse_min_group_trim_size='5') */ i, j, COUNT(*) " + + "FROM mytable " + + "GROUP BY i, j " + + "ORDER BY j DESC " + + "LIMIT 5 "; + ResultSetGroup result = conn.execute(query); + assertTrimFlagSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n" + + "99,\t999,\t4\n" + + "98,\t998,\t4\n" + + "97,\t997,\t4\n" + + "96,\t996,\t4\n" + + "95,\t995,\t4", toResultStr(result)); + + assertEquals( + "Execution Plan\n" + + "LogicalSort(sort0=[$1], dir0=[DESC], offset=[0], fetch=[5])\n" + + " PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], " + + "isSortOnReceiver=[true])\n" + + " LogicalSort(sort0=[$1], dir0=[DESC], fetch=[5])\n" + + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL], collations=[[1 DESC]]," + + " limit=[5])\n" // receives 50-row-big blocks, trimming kicks in only if limit is lower + + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" // splits blocks via hash distribution + + " LeafStageCombineOperator(table=[mytable])\n" // no trimming happens 'below' + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" + + " GroupBy(groupKeys=[[i, j]], aggregations=[[count(*)]])\n" + + " Project(columns=[[i, j]])\n" + + " DocIdSet(maxDocs=[40000])\n" + + " FilterMatchEntireSegment(numDocs=[4000])\n", + toExplainStr(postQuery(" set explainAskingServers=true; EXPLAIN PLAN FOR " + query), true)); + } + + // SSQE segment level + @Test + public void testSSQEFilteredGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, SUM(i) FILTER (WHERE i > 0) FROM mytable GROUP BY i, j ORDER BY i + j DESC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + query); + assertTrimFlagSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"sum(i) FILTER(WHERE i > '0')\"[\"DOUBLE\"]\n" + + "99,\t999,\t396.0\n" + + "98,\t998,\t392.0\n" + + "97,\t997,\t388.0\n" + + "96,\t996,\t384.0\n" + + "95,\t995,\t380.0", toResultStr(result)); + + assertEquals( + "BROKER_REDUCE(sort:[plus(i,j) DESC],limit:5,postAggregations:filter(sum(i),greater_than(i,'0'))),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY_FILTERED(groupKeys:i, j, aggregations:sum(i)),\t3,\t2\n" // <-- trimming happens here + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_FULL_SCAN(operator:RANGE,predicate:i > '0'),\t6,\t5\n" + + "PROJECT(i, j),\t7,\t3\n" + + "DOC_ID_SET,\t8,\t7\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t9,\t8\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY i + j DESC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + query); + assertTrimFlagSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n" + + "99,\t999,\t4\n" + + "98,\t998,\t4\n" + + "97,\t997,\t4\n" + + "96,\t996,\t4\n" + + "95,\t995,\t4", toResultStr(result)); + + assertEquals("BROKER_REDUCE(sort:[plus(i,j) DESC],limit:5),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" //<-- trimming happens here + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtSegmentLevelWithOrderBySomeGroupByKeysIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + query); + assertTrimFlagSet(result); + + // With test data set result is stable, but in general, trimming data ordered by subset of + // group by keys can produce incomplete group aggregates due to lack of stability. + // That is because, for a given value of j, sorting treats all values of i the same, + // and segment data is usually unordered. + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n" + + "99,\t999,\t4\n" + + "98,\t998,\t4\n" + + "97,\t997,\t4\n" + + "96,\t996,\t4\n" + + "95,\t995,\t4", toResultStr(result)); + + assertEquals( + "BROKER_REDUCE(sort:[j DESC],limit:5),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" // <- trimming happens here + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAllGroupByKeysAndHavingIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + + // trimming is safe on rows ordered by all group by keys (regardless of key order, direction or duplications) + // but not when HAVING clause is present + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i > 50 ORDER BY i ASC, j ASC"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + query); + assertTrimFlagSet(result); + + // Result is unexpectedly empty because segment-level trim keeps first 50 records ordered by i ASC, j ASC + // that are later filtered out at broker stage. + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]", toResultStr(result)); + + assertEquals("BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j ASC],limit:10),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" // <- trimming happens here + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAllGroupByKeysIsSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + + // trimming is safe on rows ordered by all group by keys (regardless of key order, direction or duplications) + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY j ASC, i DESC, j ASC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + query); + assertTrimFlagNotSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n" + + "0,\t0,\t4\n" + + "1,\t1,\t4\n" + + "2,\t2,\t4\n" + + "3,\t3,\t4\n" + + "4,\t4,\t4", toResultStr(result)); + + assertEquals( + "BROKER_REDUCE(sort:[j ASC, i DESC],limit:5),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtSegmentLevelWithOrderByAggregateIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + + // trimming is safe on rows ordered by all group by keys (regardless of key order or direction) + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY count(*)*j ASC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + ResultSetGroup result = conn.execute("SET minSegmentGroupTrimSize=5; " + query); + assertTrimFlagSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n" + + "0,\t0,\t4\n" + + "1,\t1,\t4\n" + + "2,\t2,\t4\n" + + "3,\t3,\t4\n" + + "4,\t4,\t4", toResultStr(result)); + + assertEquals("BROKER_REDUCE(sort:[times(count(*),j) ASC],limit:5,postAggregations:times(count(*),j)),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + // SSQE inter-segment level + + @Test + public void testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY i DESC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + // on server level, trimming occurs only when threshold is reached + ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET groupTrimThreshold = 50; " + query); + assertTrimFlagSet(result); + // result's order is not stable due to concurrent operations on indexed table + + assertEquals("BROKER_REDUCE(sort:[i DESC],limit:5),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" // <-- trimming happens here + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAllGroupByKeysIsSafe() + throws Exception { + // for SSQE server level == inter-segment level + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY i, j LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + // on server level, trimming occurs only when threshold is reached + ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET groupTrimThreshold = 100; " + query); + assertTrimFlagNotSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n" + + "0,\t0,\t4\n" + + "0,\t100,\t4\n" + + "0,\t200,\t4\n" + + "0,\t300,\t4\n" + + "0,\t400,\t4", toResultStr(result)); + + assertEquals( + "BROKER_REDUCE(sort:[i ASC, j ASC],limit:5),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAggregateIsNotSafe() + throws Exception { + // for SSQE server level == inter-segment level + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY count(*)*j DESC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + // on server level, trimming occurs only when threshold is reached + ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET groupTrimThreshold = 100; " + query); + assertTrimFlagSet(result); + + // Result, though unstable due to concurrent operations on IndexedTable, is similar to the following + // (which is not correct): + //i[INT],j[LONG],count(*)[LONG] + //98,\t998,\t4 + //94,\t994,\t4 + //90,\t990,\t4 + //86,\t986,\t4 + //79,\t979,\t4 + + assertEquals("BROKER_REDUCE(sort:[times(count(*),j) DESC],limit:5,postAggregations:times(count(*),j)),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAllGroupByKeysAndHavingIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i > 50 ORDER BY i ASC, j ASC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + // on server level, trimming occurs only when threshold is reached + ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET groupTrimThreshold = 50; " + query); + assertTrimFlagSet(result); + + // Result is unexpectedly empty because inter-segment-level trim keeps first 25 records ordered by i ASC, j ASC + // that are later filtered out at broker stage. + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]", toResultStr(result)); + + assertEquals("BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j ASC],limit:5),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + // SSQE broker level + + @Test + public void testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysIsSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY i, j LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + // on broker level, trimming occurs only when threshold is reached + ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET groupTrimThreshold = 50; " + query); + assertTrimFlagNotSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n" + + "0,\t0,\t4\n" + + "0,\t100,\t4\n" + + "0,\t200,\t4\n" + + "0,\t300,\t4\n" + + "0,\t400,\t4", toResultStr(result)); + + assertEquals("BROKER_REDUCE(sort:[i ASC, j ASC],limit:5),\t1,\t0\n" //<-- trimming happens here + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtBrokerLevelOrderedBySomeGroupByKeysIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5"; + + Connection conn = getPinotConnection(); + ResultSetGroup result1 = conn.execute(query); + assertTrimFlagNotSet(result1); + + // on broker level, trimming occurs only when threshold is reached + ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET groupTrimThreshold = 50; " + query); + assertTrimFlagSet(result); + + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]\n" + + "99,\t999,\t4\n" + + "98,\t998,\t4\n" + + "97,\t997,\t4\n" + + "96,\t996,\t4\n" + + "95,\t995,\t4", toResultStr(result)); + + assertEquals("BROKER_REDUCE(sort:[j DESC],limit:5),\t1,\t0\n" //<-- trimming happens here + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysAndHavingIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i > 50 ORDER BY i ASC, j ASC LIMIT 5"; + + Connection conn = getPinotConnection(); + ResultSetGroup result1 = conn.execute(query); + assertTrimFlagNotSet(result1); + + // on broker level, trimming occurs only when threshold is reached + ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET groupTrimThreshold = 50; " + query); + assertTrimFlagSet(result); + + // Result is unexpectedly empty because segment-level trim keeps first 50 records ordered by i ASC, j ASC + // that are later filtered out at broker stage. + assertEquals("\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"count(*)\"[\"LONG\"]", toResultStr(result)); + + assertEquals( + "BROKER_REDUCE(havingFilter:i > '50',sort:[i ASC, j ASC],limit:5),\t1,\t0\n" //<-- trimming happens here + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + @Test + public void testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByAggregateIsNotSafe() + throws Exception { + setUseMultiStageQueryEngine(false); + String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY count(*)*j DESC LIMIT 5"; + + Connection conn = getPinotConnection(); + assertTrimFlagNotSet(conn.execute(query)); + + // on broker level, trimming occurs only when threshold is reached + ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET groupTrimThreshold = 50; " + query); + assertTrimFlagSet(result); + + // result is similar to the following, but unstable: + //i[INT],j[LONG],count(*)[LONG] + //99,999,4 + //98,998,4 + //97,997,4 + //96,996,4 + //82,982,4 + + assertEquals("BROKER_REDUCE(sort:[times(count(*),j) DESC],limit:5," //<-- trimming happens here + + "postAggregations:times(count(*),j)),\t1,\t0\n" + + "COMBINE_GROUP_BY,\t2,\t1\n" + + "PLAN_START(numSegmentsForThisPlan:4),\t-1,\t-1\n" + + "GROUP_BY(groupKeys:i, j, aggregations:count(*)),\t3,\t2\n" + + "PROJECT(i, j),\t4,\t3\n" + + "DOC_ID_SET,\t5,\t4\n" + + "FILTER_MATCH_ENTIRE_SEGMENT(docs:1000),\t6,\t5\n", + toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false)); + } + + private static void assertTrimFlagNotSet(ResultSetGroup result) { + assertFalse(result.getBrokerResponse().getExecutionStats().isGroupsTrimmed()); + } + + private static void assertTrimFlagSet(ResultSetGroup result) { + assertTrue(result.getBrokerResponse().getExecutionStats().isGroupsTrimmed()); + } + + @AfterClass + public void tearDown() + throws Exception { + dropOfflineTable(DEFAULT_TABLE_NAME); + + stopServer(); + stopBroker(); + stopController(); + stopZk(); + + FileUtils.deleteDirectory(_tempDir); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index 16115fa728..f9fb427dd1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -229,6 +229,10 @@ public class AggregateOperator extends MultiStageOperator { return _eosBlock; } else { MseBlock dataBlock = new RowHeapDataBlock(rows, _resultSchema, _aggFunctions); + if (_groupByExecutor.getRowsProcessed() > _groupTrimSize) { + _statMap.merge(StatKey.GROUPS_TRIMMED, true); + } + if (_groupByExecutor.isNumGroupsLimitReached()) { if (_errorOnNumGroupsLimit) { _input.earlyTerminate(); @@ -461,6 +465,7 @@ public class AggregateOperator extends MultiStageOperator { return true; } }, + GROUPS_TRIMMED(StatMap.Type.BOOLEAN), NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN), NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN); //@formatter:on diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java index 20c96fdcf1..fab26d033b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java @@ -240,6 +240,9 @@ public class LeafOperator extends MultiStageOperator { case TOTAL_DOCS: _statMap.merge(StatKey.TOTAL_DOCS, Long.parseLong(entry.getValue())); break; + case GROUPS_TRIMMED: + _statMap.merge(StatKey.GROUPS_TRIMMED, Boolean.parseBoolean(entry.getValue())); + break; case NUM_GROUPS_LIMIT_REACHED: _statMap.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, Boolean.parseBoolean(entry.getValue())); break; @@ -654,6 +657,7 @@ public class LeafOperator extends MultiStageOperator { NUM_SEGMENTS_PRUNED_INVALID(StatMap.Type.INT), NUM_SEGMENTS_PRUNED_BY_LIMIT(StatMap.Type.INT), NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT), + GROUPS_TRIMMED(StatMap.Type.BOOLEAN), NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN), NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN), NUM_RESIZES(StatMap.Type.INT, null), diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index d9a285d342..3b179ed4b8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -236,6 +236,7 @@ public abstract class MultiStageOperator public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) { @SuppressWarnings("unchecked") StatMap<AggregateOperator.StatKey> stats = (StatMap<AggregateOperator.StatKey>) map; + response.mergeGroupsTrimmed(stats.getBoolean(AggregateOperator.StatKey.GROUPS_TRIMMED)); response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED)); response.mergeNumGroupsWarningLimitReached( stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED)); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java index 9b978c4c43..7eafbd858e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java @@ -60,6 +60,7 @@ public class MultistageGroupByExecutor { private final AggType _aggType; private final boolean _leafReturnFinalResult; private final DataSchema _resultSchema; + private int _rowsProcessed; private final int _numGroupsLimit; private final int _numGroupsWarningLimit; private final boolean _filteredAggregationsSkipEmptyGroups; @@ -206,12 +207,14 @@ public class MultistageGroupByExecutor { _groupIdGenerator.getGroupKeyIterator(numKeys + numFunctions); int idx = 0; - while (idx++ < numGroups && groupKeyIterator.hasNext()) { + while (idx < numGroups && groupKeyIterator.hasNext()) { Object[] row = getRow(groupKeyIterator, numKeys, numFunctions, resultStoredTypes); sortedRows.add(row); + idx++; } while (groupKeyIterator.hasNext()) { + idx++; // TODO: allocate new array row only if row enters set Object[] row = getRow(groupKeyIterator, numKeys, numFunctions, resultStoredTypes); if (comparator.compare(sortedRows.peek(), row) < 0) { @@ -220,6 +223,8 @@ public class MultistageGroupByExecutor { } } + _rowsProcessed = idx; + int resultSize = sortedRows.size(); ArrayList<Object[]> result = new ArrayList<>(sortedRows.size()); for (int i = resultSize - 1; i >= 0; i--) { @@ -245,9 +250,13 @@ public class MultistageGroupByExecutor { _groupIdGenerator.getGroupKeyIterator(numKeys + numFunctions); int idx = 0; - while (groupKeyIterator.hasNext() && idx++ < numGroups) { + while (groupKeyIterator.hasNext() && idx < numGroups) { Object[] row = getRow(groupKeyIterator, numKeys, numFunctions, resultStoredTypes); rows.add(row); + idx++; + } + if (groupKeyIterator.hasNext()) { + _rowsProcessed = idx + 1; } return rows; } @@ -292,6 +301,10 @@ public class MultistageGroupByExecutor { return _groupIdGenerator.getNumGroups(); } + public int getRowsProcessed() { + return _rowsProcessed; + } + public boolean isNumGroupsLimitReached() { return _groupIdGenerator.getNumGroups() == _numGroupsLimit; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java index 114752ae0e..3e928943d4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java @@ -62,6 +62,7 @@ public class DefaultRequestContext implements RequestScope { private long _realtimeTotalMemAllocatedBytes; private int _numServersQueried; private int _numServersResponded; + private boolean _groupsTrimmed; private boolean _isNumGroupsLimitReached; private int _numExceptions; private String _brokerId; @@ -340,6 +341,11 @@ public class DefaultRequestContext implements RequestScope { return _realtimeThreadCpuTimeNs; } + @Override + public boolean isGroupsTrimmed() { + return _groupsTrimmed; + } + @Override public boolean isNumGroupsLimitReached() { return _isNumGroupsLimitReached; @@ -480,6 +486,11 @@ public class DefaultRequestContext implements RequestScope { _numServersResponded = numServersResponded; } + @Override + public void setGroupsTrimmed(boolean groupsTrimmed) { + _groupsTrimmed = groupsTrimmed; + } + @Override public void setNumGroupsLimitReached(boolean numGroupsLimitReached) { _isNumGroupsLimitReached = numGroupsLimitReached; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java index e49f3e769f..fda3e4d4b5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java @@ -144,6 +144,8 @@ public interface RequestContext { long getRealtimeTotalMemAllocatedBytes(); void setRealtimeTotalMemAllocatedBytes(long realtimeTotalMemAllocatedBytes); + boolean isGroupsTrimmed(); + boolean isNumGroupsLimitReached(); int getNumExceptions(); @@ -176,6 +178,8 @@ public interface RequestContext { void setNumServersResponded(int numServersResponded); + void setGroupsTrimmed(boolean groupsTrimmed); + void setNumGroupsLimitReached(boolean numGroupsLimitReached); void setNumExceptions(int numExceptions); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org