Jackie-Jiang commented on code in PR #17247:
URL: https://github.com/apache/pinot/pull/17247#discussion_r2684452398


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java:
##########
@@ -98,45 +122,73 @@ protected DistinctResultsBlock getNextBlock() {
       default:
         throw new IllegalStateException("Unsupported data type: " + 
dictionary.getValueType());
     }
-    return new DistinctResultsBlock(distinctTable, _queryContext);
+    DistinctResultsBlock resultsBlock = new 
DistinctResultsBlock(distinctTable, _queryContext);
+    resultsBlock.setNumDocsScanned(_numDocsScanned);
+    if (_hitTimeLimit) {
+      
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+    } else if (_hitMaxRowsLimit) {
+      
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+    }
+    return resultsBlock;
   }
 
   private IntDistinctTable createIntDistinctTable(DataSchema dataSchema, 
Dictionary dictionary,
       @Nullable OrderByExpressionContext orderByExpression) {
     int limit = _queryContext.getLimit();
     int dictLength = dictionary.length();
     int numValuesToKeep = Math.min(limit, dictLength);
+    boolean requiresFullScan = orderByExpression != null && 
!dictionary.isSorted();
+    int rowsConsidered = requiresFullScan ? dictLength : numValuesToKeep;
+    int rowsToProcess = clampRows(rowsConsidered);
     IntDistinctTable distinctTable =
         new IntDistinctTable(dataSchema, limit, 
_queryContext.isNullHandlingEnabled(), orderByExpression);
+    int rowsProcessed = 0;
     if (orderByExpression == null) {
-      for (int i = 0; i < numValuesToKeep; i++) {
+      for (int i = 0; i < rowsToProcess; i++) {
+        if (hasExceededTimeLimit()) {

Review Comment:
   (MAJOR) This is row level operation. We cannot check time on each row.
   Please test the performance overhead of this. I'm pretty sure currently it 
will make the query way slower.



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private final int _maxRowsAcrossSegments;
+  private boolean _rowBudgetReached;
+  private boolean _timeLimitReached;
+  private final long _maxExecutionTimeNs;
+  private final long _startTimeNs;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    Integer maxRows = null;
+    Long maxExecutionTimeMs = null;
+    if (queryContext.getQueryOptions() != null) {

Review Comment:
   (minor) Query options is never `null`



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java:
##########
@@ -53,14 +57,34 @@ public class DictionaryBasedDistinctOperator extends 
BaseOperator<DistinctResult
   private final QueryContext _queryContext;
 
   private int _numDocsScanned;
+  private final int _maxRowsInDistinct;

Review Comment:
   Do you think we need to apply max rows and time limit within segment (at 
block level)? I feel applying it at segment level (during combine) is probably 
good enough. 



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private final int _maxRowsAcrossSegments;
+  private boolean _rowBudgetReached;
+  private boolean _timeLimitReached;
+  private final long _maxExecutionTimeNs;
+  private final long _startTimeNs;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    Integer maxRows = null;
+    Long maxExecutionTimeMs = null;
+    if (queryContext.getQueryOptions() != null) {
+      maxRows = 
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+      maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+    }
+    _maxRowsAcrossSegments = Objects.requireNonNullElse(maxRows, 
Integer.MAX_VALUE);
+    _maxExecutionTimeNs = maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+        : Long.MAX_VALUE;
+    _startTimeNs = System.nanoTime();
+  }
 
   @Override
   public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
+    if (_timeLimitReached) {
+      if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.NONE) {
+        
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+      }
+      return true;
+    }
+    if (hasExceededTimeLimit()) {
+      _timeLimitReached = true;
+      if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.NONE) {
+        
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+      }
+      return true;
+    }
+    if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT) {
+      _timeLimitReached = true;
+      return true;
+    }
+    if (_rowBudgetReached) {

Review Comment:
   We should check time limit in the end, and honor other limit first



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private final int _maxRowsAcrossSegments;
+  private boolean _rowBudgetReached;
+  private boolean _timeLimitReached;
+  private final long _maxExecutionTimeNs;
+  private final long _startTimeNs;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    Integer maxRows = null;
+    Long maxExecutionTimeMs = null;
+    if (queryContext.getQueryOptions() != null) {
+      maxRows = 
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+      maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+    }
+    _maxRowsAcrossSegments = Objects.requireNonNullElse(maxRows, 
Integer.MAX_VALUE);
+    _maxExecutionTimeNs = maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+        : Long.MAX_VALUE;
+    _startTimeNs = System.nanoTime();
+  }
 
   @Override
   public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
+    if (_timeLimitReached) {
+      if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.NONE) {

Review Comment:
   Do we need to set the flag here



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private final int _maxRowsAcrossSegments;
+  private boolean _rowBudgetReached;
+  private boolean _timeLimitReached;
+  private final long _maxExecutionTimeNs;
+  private final long _startTimeNs;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    Integer maxRows = null;
+    Long maxExecutionTimeMs = null;
+    if (queryContext.getQueryOptions() != null) {
+      maxRows = 
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+      maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+    }
+    _maxRowsAcrossSegments = Objects.requireNonNullElse(maxRows, 
Integer.MAX_VALUE);
+    _maxExecutionTimeNs = maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+        : Long.MAX_VALUE;
+    _startTimeNs = System.nanoTime();
+  }
 
   @Override
   public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
+    if (_timeLimitReached) {
+      if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.NONE) {
+        
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+      }
+      return true;
+    }
+    if (hasExceededTimeLimit()) {
+      _timeLimitReached = true;
+      if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.NONE) {
+        
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+      }
+      return true;
+    }
+    if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT) {
+      _timeLimitReached = true;
+      return true;
+    }
+    if (_rowBudgetReached) {
+      return true;
+    }
+    if (_maxRowsAcrossSegments != Integer.MAX_VALUE
+        && resultsBlock.getNumDocsScanned() >= _maxRowsAcrossSegments) {
+      if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.NONE) {
+        
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+      }
+      _rowBudgetReached = true;
+      return true;
+    }
     return resultsBlock.getDistinctTable().isSatisfied();
   }
 
   @Override
   public void mergeResultsBlocks(DistinctResultsBlock mergedBlock, 
DistinctResultsBlock blockToMerge) {
+    if (_rowBudgetReached || _timeLimitReached) {

Review Comment:
   (MAJOR) `isQuerySatisfied()` can be applied to each individual segment 
result (not merged). We need to set the flag into the merged block.
   
   E.g.
   First segment is within the budget
   Second segment itself goes over the budget
   If we terminate here, metadata won't be set into the merged result (first 
segment result)
   
   Please add a test to capture this scenario.



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private final int _maxRowsAcrossSegments;
+  private boolean _rowBudgetReached;
+  private boolean _timeLimitReached;
+  private final long _maxExecutionTimeNs;
+  private final long _startTimeNs;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    Integer maxRows = null;
+    Long maxExecutionTimeMs = null;
+    if (queryContext.getQueryOptions() != null) {
+      maxRows = 
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+      maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+    }
+    _maxRowsAcrossSegments = Objects.requireNonNullElse(maxRows, 
Integer.MAX_VALUE);
+    _maxExecutionTimeNs = maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+        : Long.MAX_VALUE;
+    _startTimeNs = System.nanoTime();
+  }
 
   @Override
   public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
+    if (_timeLimitReached) {
+      if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.NONE) {
+        
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+      }
+      return true;
+    }
+    if (hasExceededTimeLimit()) {
+      _timeLimitReached = true;
+      if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.NONE) {
+        
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+      }
+      return true;
+    }
+    if (resultsBlock.getEarlyTerminationReason() == 
BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT) {

Review Comment:
   Will this ever be hit? Global time limit should always be hit first



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java:
##########
@@ -53,14 +57,34 @@ public class DictionaryBasedDistinctOperator extends 
BaseOperator<DistinctResult
   private final QueryContext _queryContext;
 
   private int _numDocsScanned;
+  private final int _maxRowsInDistinct;
+  private boolean _hitMaxRowsLimit;
+  private final long _maxExecutionTimeNs;
+  private final LongSupplier _timeSupplier;

Review Comment:
   (minor) Will lambda introduce overhead? Given we are not really using it for 
testing purpose, directly calling `System.nanoTime()` might be cheaper



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to