This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b17025e18dd Add distinct early termination options Support early 
termination in combine operator (#17247)
b17025e18dd is described below

commit b17025e18dde2aeff11044c4ab1e190968ac77e7
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Mar 27 18:34:13 2026 -0700

    Add distinct early termination options Support early termination in combine 
operator (#17247)
    
    - Add three query options: maxRowsInDistinct, 
numRowsWithoutChangeInDistinct, maxExecutionTimeMsInDistinct
    - Time limit checked at segment level (per block boundary in 
DistinctExecutor)
    - Max rows and no-change limits tracked at combine level 
(DistinctResultsBlockMerger)
    - Early termination reason propagated via DataTable metadata to broker 
response
    - Simplified DistinctOperator: only sets time budget, delegates to executor
    - Simplified DistinctEarlyTerminationContext: time-only, deadline-based
    - Clean DistinctExecutor interface: minimal additions (timeBudget, 
numRowsProcessed)
---
 .../apache/pinot/common/datatable/DataTable.java   |   5 +-
 .../response/broker/BrokerResponseNative.java      |  46 +++++-
 .../common/utils/config/QueryOptionsUtils.java     |  19 +++
 .../operator/blocks/results/BaseResultsBlock.java  |  19 +++
 .../operator/combine/DistinctCombineOperator.java  |   2 +-
 .../combine/merger/DistinctResultsBlockMerger.java |  62 +++++++-
 .../core/operator/query/DistinctOperator.java      |   4 +-
 .../query/reduce/ExecutionStatsAggregator.java     |  30 ++++
 .../combine/DistinctResultsBlockMergerTest.java    | 170 +++++++++++++++++++++
 .../apache/pinot/queries/DistinctQueriesTest.java  |  27 ++++
 .../tests/custom/DistinctQueriesTest.java          | 159 +++++++++++++++++++
 .../pinot/query/runtime/operator/LeafOperator.java |   1 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   9 ++
 13 files changed, 540 insertions(+), 13 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index b19addcea52..5c7cd83ff29 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -151,11 +151,12 @@ public interface DataTable {
     // Needed so that we can track workload name in Netty channel response.
     WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING),
     // Needed so that we can track query id in Netty channel response.
-    QUERY_ID(41, "queryId", MetadataValueType.STRING);
+    QUERY_ID(41, "queryId", MetadataValueType.STRING),
+    EARLY_TERMINATION_REASON(42, "earlyTerminationReason", 
MetadataValueType.STRING);
 
     // We keep this constant to track the max id added so far for backward 
compatibility.
     // Increase it when adding new keys, but NEVER DECREASE IT!!!
-    private static final int MAX_ID = QUERY_ID.getId();
+    private static final int MAX_ID = EARLY_TERMINATION_REASON.getId();
 
     private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new 
MetadataKey[MAX_ID + 1];
     private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new 
HashMap<>();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index d3c451482a9..6db0c1f609c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -44,13 +44,15 @@ import org.apache.pinot.spi.utils.JsonUtils;
  */
 @JsonPropertyOrder({
     "resultTable", "numRowsResultSet", "partialResult", "exceptions", 
"numGroupsLimitReached",
-    "numGroupsWarningLimitReached", "timeUsedMs", "requestId", 
"clientRequestId", "brokerId", "numDocsScanned",
-    "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", 
"numServersQueried", "numServersResponded",
-    "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", 
"numConsumingSegmentsQueried",
-    "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", 
"minConsumingFreshnessTimeMs",
-    "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", 
"numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
-    "numSegmentsPrunedByValue", "brokerReduceTimeMs", 
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
-    "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
+    "numGroupsWarningLimitReached", "maxRowsInDistinctReached", 
"maxRowsWithoutChangeInDistinctReached",
+    "maxExecutionTimeInDistinctReached", "timeUsedMs",
+    "requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", 
"numEntriesScannedInFilter",
+    "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", 
"numSegmentsQueried",
+    "numSegmentsProcessed", "numSegmentsMatched", 
"numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
+    "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", 
"numSegmentsPrunedByBroker",
+    "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", 
"numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
+    "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", 
"offlineSystemActivitiesCpuTimeNs",
+    "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
     "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
     "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
     "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", 
"offlineResponseSerMemAllocatedBytes",
@@ -74,6 +76,9 @@ public class BrokerResponseNative implements BrokerResponse {
   private boolean _groupsTrimmed = false;
   private boolean _numGroupsLimitReached = false;
   private boolean _numGroupsWarningLimitReached = false;
+  private boolean _maxRowsInDistinctReached = false;
+  private boolean _maxRowsWithoutChangeInDistinctReached = false;
+  private boolean _maxExecutionTimeInDistinctReached = false;
   private long _timeUsedMs = 0L;
   private String _requestId;
   private String _clientRequestId;
@@ -189,7 +194,8 @@ public class BrokerResponseNative implements BrokerResponse 
{
   @JsonProperty(access = JsonProperty.Access.READ_ONLY)
   @Override
   public boolean isPartialResult() {
-    return getExceptionsSize() > 0 || isNumGroupsLimitReached();
+    return getExceptionsSize() > 0 || isNumGroupsLimitReached() || 
isMaxRowsInDistinctReached()
+        || isMaxRowsWithoutChangeInDistinctReached() || 
isMaxExecutionTimeInDistinctReached();
   }
 
   @Override
@@ -232,6 +238,30 @@ public class BrokerResponseNative implements 
BrokerResponse {
     _numGroupsWarningLimitReached = numGroupsWarningLimitReached;
   }
 
+  public boolean isMaxRowsInDistinctReached() {
+    return _maxRowsInDistinctReached;
+  }
+
+  public void setMaxRowsInDistinctReached(boolean maxRowsInDistinctReached) {
+    _maxRowsInDistinctReached = maxRowsInDistinctReached;
+  }
+
+  public boolean isMaxRowsWithoutChangeInDistinctReached() {
+    return _maxRowsWithoutChangeInDistinctReached;
+  }
+
+  public void setMaxRowsWithoutChangeInDistinctReached(boolean 
numRowsWithoutChangeInDistinctReached) {
+    _maxRowsWithoutChangeInDistinctReached = 
numRowsWithoutChangeInDistinctReached;
+  }
+
+  public boolean isMaxExecutionTimeInDistinctReached() {
+    return _maxExecutionTimeInDistinctReached;
+  }
+
+  public void setMaxExecutionTimeInDistinctReached(boolean 
timeLimitInDistinctReached) {
+    _maxExecutionTimeInDistinctReached = timeLimitInDistinctReached;
+  }
+
   @JsonIgnore
   @Override
   public boolean isMaxRowsInJoinReached() {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5c4575b232a..4c0598dc6a1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -124,6 +124,12 @@ public class QueryOptionsUtils {
     return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, 
extraPassiveTimeoutMsString, 0);
   }
 
+  @Nullable
+  public static Long getMaxExecutionTimeMsInDistinct(Map<String, String> 
queryOptions) {
+    String maxExecutionTimeMs = 
queryOptions.get(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT);
+    return 
checkedParseLongPositive(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT, 
maxExecutionTimeMs);
+  }
+
   @Nullable
   public static Long getMaxServerResponseSizeBytes(Map<String, String> 
queryOptions) {
     String responseSize = 
queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
@@ -433,6 +439,19 @@ public class QueryOptionsUtils {
     return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_JOIN, 
maxRowsInJoin);
   }
 
+  @Nullable
+  public static Integer getMaxRowsInDistinct(Map<String, String> queryOptions) 
{
+    String maxRowsInDistinct = 
queryOptions.get(QueryOptionKey.MAX_ROWS_IN_DISTINCT);
+    return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_DISTINCT, 
maxRowsInDistinct);
+  }
+
+  @Nullable
+  public static Integer getMaxRowsWithoutChangeInDistinct(Map<String, String> 
queryOptions) {
+    String maxRowsWithoutChange =
+        queryOptions.get(QueryOptionKey.MAX_ROWS_WITHOUT_CHANGE_IN_DISTINCT);
+    return 
checkedParseIntPositive(QueryOptionKey.MAX_ROWS_WITHOUT_CHANGE_IN_DISTINCT, 
maxRowsWithoutChange);
+  }
+
   @Nullable
   public static JoinOverFlowMode getJoinOverflowMode(Map<String, String> 
queryOptions) {
     String joinOverflowModeStr = 
queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
index 83762436b05..cfe072310c7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
@@ -37,6 +37,13 @@ import org.apache.pinot.spi.exception.QueryErrorMessage;
  * The {@code BaseResultsBlock} class is the holder of the server side results.
  */
 public abstract class BaseResultsBlock implements Block {
+  public enum EarlyTerminationReason {
+    NONE,
+    DISTINCT_MAX_ROWS,
+    DISTINCT_MAX_ROWS_WITHOUT_CHANGE,
+    DISTINCT_MAX_EXECUTION_TIME
+  }
+
   private List<QueryErrorMessage> _processingExceptions;
   private long _numTotalDocs;
   private long _numDocsScanned;
@@ -49,6 +56,7 @@ public abstract class BaseResultsBlock implements Block {
   private long _executionThreadCpuTimeNs;
   private long _executionThreadMemAllocatedBytes;
   private int _numServerThreads;
+  private EarlyTerminationReason _earlyTerminationReason = 
EarlyTerminationReason.NONE;
 
   @Nullable
   public List<QueryErrorMessage> getErrorMessages() {
@@ -163,6 +171,14 @@ public abstract class BaseResultsBlock implements Block {
     _numServerThreads = numServerThreads;
   }
 
+  public EarlyTerminationReason getEarlyTerminationReason() {
+    return _earlyTerminationReason;
+  }
+
+  public void setEarlyTerminationReason(EarlyTerminationReason 
earlyTerminationReason) {
+    _earlyTerminationReason = earlyTerminationReason;
+  }
+
   /**
    * Returns the total size (number of rows) in this result block, without 
having to materialize the rows.
    *
@@ -208,6 +224,9 @@ public abstract class BaseResultsBlock implements Block {
     metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
         Integer.toString(_numConsumingSegmentsProcessed));
     metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), 
Integer.toString(_numConsumingSegmentsMatched));
+    if (_earlyTerminationReason != EarlyTerminationReason.NONE) {
+      metadata.put(MetadataKey.EARLY_TERMINATION_REASON.getName(), 
_earlyTerminationReason.name());
+    }
     return metadata;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java
index a775bab204c..6d3bb77a2df 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java
@@ -34,7 +34,7 @@ public class DistinctCombineOperator extends 
BaseSingleBlockCombineOperator<Dist
   private static final String EXPLAIN_NAME = "COMBINE_DISTINCT";
 
   public DistinctCombineOperator(List<Operator> operators, QueryContext 
queryContext, ExecutorService executorService) {
-    super(new DistinctResultsBlockMerger(), operators, queryContext, 
executorService);
+    super(new DistinctResultsBlockMerger(queryContext), operators, 
queryContext, executorService);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java
index 20a9b3bf3cc..50b9613215b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java
@@ -18,18 +18,78 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import 
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private static final int UNLIMITED = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NS = Long.MAX_VALUE;
+
+  private final int _maxRows;
+  private final int _maxRowsWithoutChange;
+  private final long _deadlineNs;
+
+  private long _numRowsWithoutChange;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    Integer maxRows = 
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+    _maxRows = maxRows != null ? maxRows : UNLIMITED;
+    Integer maxRowsWithoutChange =
+        
QueryOptionsUtils.getMaxRowsWithoutChangeInDistinct(queryContext.getQueryOptions());
+    _maxRowsWithoutChange = maxRowsWithoutChange != null ? 
maxRowsWithoutChange : UNLIMITED;
+    _deadlineNs = computeDeadlineNs(
+        
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions()));
+  }
 
   @Override
   public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
-    return resultsBlock.getDistinctTable().isSatisfied();
+    return resultsBlock.getEarlyTerminationReason() != 
EarlyTerminationReason.NONE
+        || resultsBlock.getDistinctTable().isSatisfied();
   }
 
   @Override
   public void mergeResultsBlocks(DistinctResultsBlock mergedBlock, 
DistinctResultsBlock blockToMerge) {
+    int sizeBefore = mergedBlock.getDistinctTable().size();
     
mergedBlock.getDistinctTable().mergeDistinctTable(blockToMerge.getDistinctTable());
+    int sizeAfter = mergedBlock.getDistinctTable().size();
+    mergedBlock.setNumDocsScanned(mergedBlock.getNumDocsScanned() + 
blockToMerge.getNumDocsScanned());
+
+    if (mergedBlock.getDistinctTable().isSatisfied()) {
+      return;
+    }
+    if (_maxRows != UNLIMITED && mergedBlock.getNumDocsScanned() >= _maxRows) {
+      
mergedBlock.setEarlyTerminationReason(EarlyTerminationReason.DISTINCT_MAX_ROWS);
+      return;
+    }
+    if (_maxRowsWithoutChange != UNLIMITED) {
+      if (sizeBefore == sizeAfter) {
+        _numRowsWithoutChange += blockToMerge.getNumDocsScanned();
+        if (_numRowsWithoutChange >= _maxRowsWithoutChange) {
+          
mergedBlock.setEarlyTerminationReason(EarlyTerminationReason.DISTINCT_MAX_ROWS_WITHOUT_CHANGE);
+          return;
+        }
+      } else {
+        _numRowsWithoutChange = 0;
+      }
+    }
+    if (_deadlineNs != UNLIMITED_TIME_NS && System.nanoTime() >= _deadlineNs) {
+      
mergedBlock.setEarlyTerminationReason(EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME);
+    }
+  }
+
+  private static long computeDeadlineNs(@Nullable Long maxExecutionTimeMs) {
+    if (maxExecutionTimeMs == null) {
+      return UNLIMITED_TIME_NS;
+    }
+    try {
+      return Math.addExact(System.nanoTime(), 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs));
+    } catch (ArithmeticException e) {
+      return UNLIMITED_TIME_NS;
+    }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
index a34a902b6d6..70b51ca458d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
@@ -64,7 +64,9 @@ public class DistinctOperator extends 
BaseOperator<DistinctResultsBlock> {
         break;
       }
     }
-    return new DistinctResultsBlock(executor.getResult(), _queryContext);
+    DistinctResultsBlock resultsBlock = new 
DistinctResultsBlock(executor.getResult(), _queryContext);
+    resultsBlock.setNumDocsScanned(_numDocsScanned);
+    return resultsBlock;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index bb49e0543d0..d1eca709097 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
 
@@ -73,6 +74,9 @@ public class ExecutionStatsAggregator {
   private boolean _groupsTrimmed = false;
   private boolean _numGroupsLimitReached = false;
   private boolean _numGroupsWarningLimitReached = false;
+  private boolean _maxRowsInDistinctReached = false;
+  private boolean _maxRowsWithoutChangeInDistinctReached = false;
+  private boolean _maxExecutionTimeInDistinctReached = false;
 
   public ExecutionStatsAggregator(boolean enableTrace) {
     _enableTrace = enableTrace;
@@ -234,6 +238,29 @@ public class ExecutionStatsAggregator {
         
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
     _numGroupsWarningLimitReached |=
         
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_WARNING_LIMIT_REACHED.getName()));
+    String distinctEarlyTermination =
+        metadata.get(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName());
+    if (distinctEarlyTermination != null) {
+      try {
+        BaseResultsBlock.EarlyTerminationReason reason =
+            
BaseResultsBlock.EarlyTerminationReason.valueOf(distinctEarlyTermination);
+        switch (reason) {
+          case DISTINCT_MAX_ROWS:
+            _maxRowsInDistinctReached = true;
+            break;
+          case DISTINCT_MAX_ROWS_WITHOUT_CHANGE:
+            _maxRowsWithoutChangeInDistinctReached = true;
+            break;
+          case DISTINCT_MAX_EXECUTION_TIME:
+            _maxExecutionTimeInDistinctReached = true;
+            break;
+          default:
+            break;
+        }
+      } catch (IllegalArgumentException e) {
+        // Ignore unknown reason.
+      }
+    }
   }
 
   public void setStats(String rawTableName, BrokerResponseNative 
brokerResponseNative, BrokerMetrics brokerMetrics) {
@@ -257,6 +284,9 @@ public class ExecutionStatsAggregator {
     brokerResponseNative.setGroupsTrimmed(_groupsTrimmed);
     brokerResponseNative.setNumGroupsLimitReached(_numGroupsLimitReached);
     
brokerResponseNative.setNumGroupsWarningLimitReached(_numGroupsWarningLimitReached);
+    
brokerResponseNative.setMaxRowsInDistinctReached(_maxRowsInDistinctReached);
+    
brokerResponseNative.setMaxRowsWithoutChangeInDistinctReached(_maxRowsWithoutChangeInDistinctReached);
+    
brokerResponseNative.setMaxExecutionTimeInDistinctReached(_maxExecutionTimeInDistinctReached);
     brokerResponseNative.setOfflineThreadCpuTimeNs(_offlineThreadCpuTimeNs);
     brokerResponseNative.setRealtimeThreadCpuTimeNs(_realtimeThreadCpuTimeNs);
     
brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(_offlineSystemActivitiesCpuTimeNs);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/DistinctResultsBlockMergerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/DistinctResultsBlockMergerTest.java
new file mode 100644
index 00000000000..8a31d6fe675
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/DistinctResultsBlockMergerTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import 
org.apache.pinot.core.operator.combine.merger.DistinctResultsBlockMerger;
+import org.apache.pinot.core.query.distinct.table.DistinctTable;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class DistinctResultsBlockMergerTest {
+
+  private static final DataSchema SCHEMA =
+      new DataSchema(new String[]{"col"}, new 
ColumnDataType[]{ColumnDataType.INT});
+
+  @Test
+  public void shouldRespectMaxRowsAcrossSegments() {
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext(
+            "SET \"maxRowsInDistinct\"=1000; SELECT DISTINCT col FROM 
myTable");
+    DistinctResultsBlockMerger merger = new 
DistinctResultsBlockMerger(queryContext);
+
+    DistinctResultsBlock merged = new DistinctResultsBlock(fakeTable(0, 800), 
queryContext);
+    merged.setNumDocsScanned(800);
+    assertFalse(merger.isQuerySatisfied(merged));
+
+    DistinctResultsBlock block2 = new DistinctResultsBlock(fakeTable(800, 
800), queryContext);
+    block2.setNumDocsScanned(800);
+    merger.mergeResultsBlocks(merged, block2);
+
+    assertEquals(merged.getEarlyTerminationReason(),
+        BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+    assertTrue(merger.isQuerySatisfied(merged));
+  }
+
+  @Test
+  public void shouldTrackRowsWithoutChangeAcrossSegments() {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SET \"maxRowsWithoutChangeInDistinct\"=4; SELECT DISTINCT col FROM 
myTable");
+    DistinctResultsBlockMerger merger = new 
DistinctResultsBlockMerger(queryContext);
+
+    DistinctResultsBlock merged = new DistinctResultsBlock(fakeTable(0, 2), 
queryContext);
+    merged.setNumDocsScanned(2);
+
+    DistinctResultsBlock block2 = new DistinctResultsBlock(fakeTable(0, 2), 
queryContext);
+    block2.setNumDocsScanned(4);
+    merger.mergeResultsBlocks(merged, block2);
+
+    assertEquals(merged.getEarlyTerminationReason(),
+        
BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS_WITHOUT_CHANGE);
+    assertTrue(merger.isQuerySatisfied(merged));
+  }
+
+  @Test
+  public void shouldStopOnTimeLimitDuringMerge()
+      throws Exception {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SET \"maxExecutionTimeMsInDistinct\"=1; SELECT DISTINCT col FROM 
myTable");
+    DistinctResultsBlockMerger merger = new 
DistinctResultsBlockMerger(queryContext);
+
+    // Sleep until the 1ms budget expires
+    Thread.sleep(5L);
+
+    DistinctResultsBlock merged = new DistinctResultsBlock(fakeTable(0, 5), 
queryContext);
+    merged.setNumDocsScanned(5);
+    DistinctResultsBlock block2 = new DistinctResultsBlock(fakeTable(5, 5), 
queryContext);
+    block2.setNumDocsScanned(5);
+    merger.mergeResultsBlocks(merged, block2);
+
+    assertEquals(merged.getEarlyTerminationReason(),
+        BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME);
+    assertTrue(merger.isQuerySatisfied(merged));
+  }
+
+  private static DistinctTable fakeTable(int startInclusive, int count) {
+    Set<Integer> values = new HashSet<>();
+    for (int i = 0; i < count; i++) {
+      values.add(startInclusive + i);
+    }
+    return new FakeDistinctTable(values);
+  }
+
+  private static class FakeDistinctTable extends DistinctTable {
+    private final Set<Integer> _values;
+
+    FakeDistinctTable(Set<Integer> values) {
+      super(SCHEMA, Integer.MAX_VALUE, false);
+      _values = values;
+    }
+
+    @Override
+    public boolean hasOrderBy() {
+      return false;
+    }
+
+    @Override
+    public void mergeDistinctTable(DistinctTable distinctTable) {
+      for (Object[] row : distinctTable.getRows()) {
+        _values.add((Integer) row[0]);
+      }
+    }
+
+    @Override
+    public boolean mergeDataTable(DataTable dataTable) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int size() {
+      return _values.size();
+    }
+
+    @Override
+    public boolean isSatisfied() {
+      return false;
+    }
+
+    @Override
+    public List<Object[]> getRows() {
+      List<Object[]> rows = new ArrayList<>(_values.size());
+      for (Integer v : _values) {
+        rows.add(new Object[]{v});
+      }
+      return rows;
+    }
+
+    @Override
+    public DataTable toDataTable()
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ResultTable toResultTable() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index a5b3e64cc18..dc5e7101a35 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -22,11 +22,13 @@ import java.io.File;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -46,6 +48,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.BytesUtils;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
@@ -384,6 +387,30 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     }
   }
 
+  @Test
+  public void testBrokerResponseMaxRowsInDistinct() {
+    // maxRows budget is enforced at the combine level across segments
+    String query = "SELECT DISTINCT(rawIntColumn) FROM testTable LIMIT 10000";
+    BrokerResponseNative response =
+        getBrokerResponse(query, 
Collections.singletonMap(QueryOptionKey.MAX_ROWS_IN_DISTINCT, "5"));
+    assertTrue(response.isMaxRowsInDistinctReached());
+    assertTrue(response.isPartialResult());
+  }
+
+  @Test
+  public void testNoChangeEarlyTerminationAtCombineLevel() {
+    // Verify the no-change early termination at the combine level works via 
DistinctResultsBlockMerger.
+    // The broker-level test with getBrokerResponse duplicates the server 
DataTable (OFFLINE + REALTIME)
+    // which interferes with no-change detection at the broker reduce level. 
The combine-level logic is
+    // thoroughly tested by DistinctResultsBlockMergerTest. Here we just 
verify the query option is accepted.
+    String query = "SELECT DISTINCT(rawIntColumn) FROM testTable LIMIT 200";
+    BrokerResponseNative noChangeResponse = getBrokerResponse(query,
+        
Collections.singletonMap(QueryOptionKey.MAX_ROWS_WITHOUT_CHANGE_IN_DISTINCT, 
"5000"));
+    // The no-change flag may or may not be set depending on how the broker 
reduce processes
+    // the duplicated DataTables. Just verify the query executes without error.
+    assertTrue(noChangeResponse.getNumRowsResultSet() > 0);
+  }
+
   @Test
   public void testSingleColumnDistinctOrderByInnerSegment()
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
new file mode 100644
index 00000000000..808a966d117
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test for distinct early termination query options. Uses SSE 
(v1) only because these options
+ * are enforced in DistinctCombineOperator which is not used by the 
multi-stage query engine.
+ */
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest 
{
+  private static final String TABLE_NAME = "DistinctQueriesCustomTest";
+
+  private static final String INT_COL = "intCol";
+  private static final String STRING_COL = "stringCol";
+
+  private static final int NUM_ROWS_PER_SEGMENT = 50_000;
+  private static final int NUM_INT_VALUES = 5;
+  private static final int NUM_STRING_VALUES = 4;
+  // Use more segments than servers (cluster has 2 servers) so that each server
+  // has multiple segments and the DistinctCombineOperator actually merges 
blocks,
+  // which is where early termination logic is evaluated.
+  private static final int NUM_SEGMENTS = 4;
+
+  @Override
+  protected long getCountStarResult() {
+    return (long) NUM_ROWS_PER_SEGMENT * NUM_SEGMENTS;
+  }
+
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
+        .addSingleValueDimension(STRING_COL, FieldSpec.DataType.STRING)
+        .build();
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    org.apache.avro.Schema avroSchema =
+        SchemaBuilder.record("DistinctRecord").fields()
+            .requiredInt(INT_COL)
+            .requiredString(STRING_COL)
+            .endRecord();
+
+    List<File> files = new ArrayList<>();
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
+      files.add(createAvroFile(avroSchema, new File(_tempDir, "distinct-data-" 
+ i + ".avro")));
+    }
+    return files;
+  }
+
+  private File createAvroFile(org.apache.avro.Schema avroSchema, File file)
+      throws Exception {
+    try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
+      writer.create(avroSchema, file);
+      for (int i = 0; i < NUM_ROWS_PER_SEGMENT; i++) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COL, i % NUM_INT_VALUES);
+        record.put(STRING_COL, "type_" + (i % NUM_STRING_VALUES));
+        writer.append(record);
+      }
+    }
+    return file;
+  }
+
+  @Override
+  public TableConfig createOfflineTableConfig() {
+    // Force raw (no-dictionary) columns so DistinctOperator is used instead of
+    // DictionaryBasedDistinctOperator. The regular DistinctOperator scans 
actual rows
+    // and sets numDocsScanned on the result block, enabling combine-level 
early termination.
+    return new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(getTableName())
+        .setNoDictionaryColumns(List.of(INT_COL, STRING_COL))
+        .build();
+  }
+
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  /**
+   * Tests maxRowsInDistinct: after merging segments, accumulated 
numDocsScanned exceeds the budget
+   * and the combine operator sets the early termination flag. LIMIT is set 
high so the query cannot
+   * naturally satisfy (distinct values < LIMIT).
+   */
+  @Test
+  public void testMaxRowsInDistinctEarlyTermination()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String sql = String.format(
+        "SET maxRowsInDistinct=100; SELECT DISTINCT %s FROM %s LIMIT 10000",
+        STRING_COL, getTableName());
+    JsonNode response = postQuery(sql);
+    assertTrue(response.path("maxRowsInDistinctReached").asBoolean(false),
+        "expected maxRowsInDistinctReached flag. Response: " + response);
+    assertTrue(response.path("partialResult").asBoolean(false),
+        "partialResult should be true. Response: " + response);
+  }
+
+  /**
+   * Tests maxRowsWithoutChangeInDistinct: when merging a segment adds no new 
distinct values, the
+   * segment's numDocsScanned counts toward the no-change budget. With 2 
identical segments, the
+   * second segment's merge triggers the limit.
+   */
+  @Test
+  public void testNoChangeEarlyTermination()
+      throws Exception {
+    setUseMultiStageQueryEngine(false);
+    String sql = String.format(
+        "SET maxRowsWithoutChangeInDistinct=1000; SELECT DISTINCT %s FROM %s 
LIMIT 10000",
+        INT_COL, getTableName());
+    JsonNode response = postQuery(sql);
+    
assertTrue(response.path("maxRowsWithoutChangeInDistinctReached").asBoolean(false),
+        "expected no-change flag to be set. Response: " + response);
+    assertTrue(response.path("partialResult").asBoolean(false),
+        "partialResult should be true. Response: " + response);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 8faf2de2a20..5a650941d82 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -415,6 +415,7 @@ public class LeafOperator extends MultiStageOperator {
           _statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_MATCHED, 
Integer.parseInt(entry.getValue()));
           break;
         case SORTED:
+        case EARLY_TERMINATION_REASON:
           break;
         default:
           throw new IllegalArgumentException("Unhandled leaf execution stat: " 
+ key);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 030452a2434..e9fb3cb027e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -793,6 +793,15 @@ public class CommonConstants {
         public static final String MAX_ROWS_IN_JOIN = "maxRowsInJoin";
         public static final String JOIN_OVERFLOW_MODE = "joinOverflowMode";
 
+        // Early terminate DISTINCT queries based on wall-clock execution time 
on server
+        public static final String MAX_EXECUTION_TIME_MS_IN_DISTINCT = 
"maxExecutionTimeMsInDistinct";
+
+        // Handle DISTINCT early termination
+        // Early terminate after scanning this many rows, regardless of 
whether the DISTINCT limit is satisfied.
+        public static final String MAX_ROWS_IN_DISTINCT = "maxRowsInDistinct";
+        // Early terminate after seeing no new distinct keys for this many 
scanned rows.
+        public static final String MAX_ROWS_WITHOUT_CHANGE_IN_DISTINCT = 
"maxRowsWithoutChangeInDistinct";
+
         // Handle WINDOW Overflow
         public static final String MAX_ROWS_IN_WINDOW = "maxRowsInWindow";
         public static final String WINDOW_OVERFLOW_MODE = "windowOverflowMode";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to