Jackie-Jiang commented on code in PR #17247:
URL: https://github.com/apache/pinot/pull/17247#discussion_r2684452398
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java:
##########
@@ -98,45 +122,73 @@ protected DistinctResultsBlock getNextBlock() {
default:
throw new IllegalStateException("Unsupported data type: " +
dictionary.getValueType());
}
- return new DistinctResultsBlock(distinctTable, _queryContext);
+ DistinctResultsBlock resultsBlock = new
DistinctResultsBlock(distinctTable, _queryContext);
+ resultsBlock.setNumDocsScanned(_numDocsScanned);
+ if (_hitTimeLimit) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+ } else if (_hitMaxRowsLimit) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+ }
+ return resultsBlock;
}
private IntDistinctTable createIntDistinctTable(DataSchema dataSchema,
Dictionary dictionary,
@Nullable OrderByExpressionContext orderByExpression) {
int limit = _queryContext.getLimit();
int dictLength = dictionary.length();
int numValuesToKeep = Math.min(limit, dictLength);
+ boolean requiresFullScan = orderByExpression != null &&
!dictionary.isSorted();
+ int rowsConsidered = requiresFullScan ? dictLength : numValuesToKeep;
+ int rowsToProcess = clampRows(rowsConsidered);
IntDistinctTable distinctTable =
new IntDistinctTable(dataSchema, limit,
_queryContext.isNullHandlingEnabled(), orderByExpression);
+ int rowsProcessed = 0;
if (orderByExpression == null) {
- for (int i = 0; i < numValuesToKeep; i++) {
+ for (int i = 0; i < rowsToProcess; i++) {
+ if (hasExceededTimeLimit()) {
Review Comment:
(MAJOR) This is row level operation. We cannot check time on each row.
Please test the performance overhead of this. I'm pretty sure currently it
will make the query way slower.
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
public class DistinctResultsBlockMerger implements
ResultsBlockMerger<DistinctResultsBlock> {
+ private final int _maxRowsAcrossSegments;
+ private boolean _rowBudgetReached;
+ private boolean _timeLimitReached;
+ private final long _maxExecutionTimeNs;
+ private final long _startTimeNs;
+
+ public DistinctResultsBlockMerger(QueryContext queryContext) {
+ Integer maxRows = null;
+ Long maxExecutionTimeMs = null;
+ if (queryContext.getQueryOptions() != null) {
Review Comment:
(minor) Query options is never `null`
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java:
##########
@@ -53,14 +57,34 @@ public class DictionaryBasedDistinctOperator extends
BaseOperator<DistinctResult
private final QueryContext _queryContext;
private int _numDocsScanned;
+ private final int _maxRowsInDistinct;
Review Comment:
Do you think we need to apply max rows and time limit within segment (at
block level)? I feel applying it at segment level (during combine) is probably
good enough.
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
public class DistinctResultsBlockMerger implements
ResultsBlockMerger<DistinctResultsBlock> {
+ private final int _maxRowsAcrossSegments;
+ private boolean _rowBudgetReached;
+ private boolean _timeLimitReached;
+ private final long _maxExecutionTimeNs;
+ private final long _startTimeNs;
+
+ public DistinctResultsBlockMerger(QueryContext queryContext) {
+ Integer maxRows = null;
+ Long maxExecutionTimeMs = null;
+ if (queryContext.getQueryOptions() != null) {
+ maxRows =
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+ maxExecutionTimeMs =
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+ }
+ _maxRowsAcrossSegments = Objects.requireNonNullElse(maxRows,
Integer.MAX_VALUE);
+ _maxExecutionTimeNs = maxExecutionTimeMs != null ?
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+ : Long.MAX_VALUE;
+ _startTimeNs = System.nanoTime();
+ }
@Override
public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
+ if (_timeLimitReached) {
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.NONE) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+ }
+ return true;
+ }
+ if (hasExceededTimeLimit()) {
+ _timeLimitReached = true;
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.NONE) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+ }
+ return true;
+ }
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT) {
+ _timeLimitReached = true;
+ return true;
+ }
+ if (_rowBudgetReached) {
Review Comment:
We should check time limit in the end, and honor other limit first
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
public class DistinctResultsBlockMerger implements
ResultsBlockMerger<DistinctResultsBlock> {
+ private final int _maxRowsAcrossSegments;
+ private boolean _rowBudgetReached;
+ private boolean _timeLimitReached;
+ private final long _maxExecutionTimeNs;
+ private final long _startTimeNs;
+
+ public DistinctResultsBlockMerger(QueryContext queryContext) {
+ Integer maxRows = null;
+ Long maxExecutionTimeMs = null;
+ if (queryContext.getQueryOptions() != null) {
+ maxRows =
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+ maxExecutionTimeMs =
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+ }
+ _maxRowsAcrossSegments = Objects.requireNonNullElse(maxRows,
Integer.MAX_VALUE);
+ _maxExecutionTimeNs = maxExecutionTimeMs != null ?
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+ : Long.MAX_VALUE;
+ _startTimeNs = System.nanoTime();
+ }
@Override
public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
+ if (_timeLimitReached) {
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.NONE) {
Review Comment:
Do we need to set the flag here
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
public class DistinctResultsBlockMerger implements
ResultsBlockMerger<DistinctResultsBlock> {
+ private final int _maxRowsAcrossSegments;
+ private boolean _rowBudgetReached;
+ private boolean _timeLimitReached;
+ private final long _maxExecutionTimeNs;
+ private final long _startTimeNs;
+
+ public DistinctResultsBlockMerger(QueryContext queryContext) {
+ Integer maxRows = null;
+ Long maxExecutionTimeMs = null;
+ if (queryContext.getQueryOptions() != null) {
+ maxRows =
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+ maxExecutionTimeMs =
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+ }
+ _maxRowsAcrossSegments = Objects.requireNonNullElse(maxRows,
Integer.MAX_VALUE);
+ _maxExecutionTimeNs = maxExecutionTimeMs != null ?
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+ : Long.MAX_VALUE;
+ _startTimeNs = System.nanoTime();
+ }
@Override
public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
+ if (_timeLimitReached) {
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.NONE) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+ }
+ return true;
+ }
+ if (hasExceededTimeLimit()) {
+ _timeLimitReached = true;
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.NONE) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+ }
+ return true;
+ }
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT) {
+ _timeLimitReached = true;
+ return true;
+ }
+ if (_rowBudgetReached) {
+ return true;
+ }
+ if (_maxRowsAcrossSegments != Integer.MAX_VALUE
+ && resultsBlock.getNumDocsScanned() >= _maxRowsAcrossSegments) {
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.NONE) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+ }
+ _rowBudgetReached = true;
+ return true;
+ }
return resultsBlock.getDistinctTable().isSatisfied();
}
@Override
public void mergeResultsBlocks(DistinctResultsBlock mergedBlock,
DistinctResultsBlock blockToMerge) {
+ if (_rowBudgetReached || _timeLimitReached) {
Review Comment:
(MAJOR) `isQuerySatisfied()` can be applied to each individual segment
result (not merged). We need to set the flag into the merged block.
E.g.
First segment is within the budget
Second segment itself goes over the budget
If we terminate here, metadata won't be set into the merged result (first
segment result)
Please add a test to capture this scenario.
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,91 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
public class DistinctResultsBlockMerger implements
ResultsBlockMerger<DistinctResultsBlock> {
+ private final int _maxRowsAcrossSegments;
+ private boolean _rowBudgetReached;
+ private boolean _timeLimitReached;
+ private final long _maxExecutionTimeNs;
+ private final long _startTimeNs;
+
+ public DistinctResultsBlockMerger(QueryContext queryContext) {
+ Integer maxRows = null;
+ Long maxExecutionTimeMs = null;
+ if (queryContext.getQueryOptions() != null) {
+ maxRows =
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+ maxExecutionTimeMs =
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+ }
+ _maxRowsAcrossSegments = Objects.requireNonNullElse(maxRows,
Integer.MAX_VALUE);
+ _maxExecutionTimeNs = maxExecutionTimeMs != null ?
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+ : Long.MAX_VALUE;
+ _startTimeNs = System.nanoTime();
+ }
@Override
public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
+ if (_timeLimitReached) {
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.NONE) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+ }
+ return true;
+ }
+ if (hasExceededTimeLimit()) {
+ _timeLimitReached = true;
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.NONE) {
+
resultsBlock.setEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT);
+ }
+ return true;
+ }
+ if (resultsBlock.getEarlyTerminationReason() ==
BaseResultsBlock.EarlyTerminationReason.TIME_LIMIT) {
Review Comment:
Will this ever be hit? Global time limit should always be hit first
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java:
##########
@@ -53,14 +57,34 @@ public class DictionaryBasedDistinctOperator extends
BaseOperator<DistinctResult
private final QueryContext _queryContext;
private int _numDocsScanned;
+ private final int _maxRowsInDistinct;
+ private boolean _hitMaxRowsLimit;
+ private final long _maxExecutionTimeNs;
+ private final LongSupplier _timeSupplier;
Review Comment:
(minor) Will lambda introduce overhead? Given we are not really using it for
testing purpose, directly calling `System.nanoTime()` might be cheaper
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]