This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ceb3ef2b2b8 Fix early termination on MSE operator (#16696)
ceb3ef2b2b8 is described below
commit ceb3ef2b2b88920341c9f3c6f3bbb1bf72b4847c
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Aug 28 12:39:08 2025 -0700
Fix early termination on MSE operator (#16696)
---
.../tests/GroupByOptionsIntegrationTest.java | 2 +-
.../runtime/executor/OpChainSchedulerService.java | 9 ++--
.../query/runtime/operator/AggregateOperator.java | 5 ++-
.../query/runtime/operator/AsofJoinOperator.java | 9 ++--
.../operator/BaseMailboxReceiveOperator.java | 4 +-
.../query/runtime/operator/HashJoinOperator.java | 23 +++++-----
.../runtime/operator/MailboxSendOperator.java | 4 +-
.../query/runtime/operator/MultiStageOperator.java | 51 ++++++++++------------
.../runtime/operator/NonEquiJoinOperator.java | 7 ++-
.../runtime/operator/WindowAggregateOperator.java | 7 ++-
.../plan/pipeline/PipelineBreakerOperator.java | 35 +++++----------
.../queries/PerQueryCPUMemAccountantTest.java | 13 ++++--
12 files changed, 79 insertions(+), 90 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
index 55c8a0ea02d..e2a5308720e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
@@ -531,7 +531,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
String errorMessage = toResultStr(result);
- Assert.assertTrue(errorMessage.contains("NUM_GROUPS_LIMIT has been reached
at "), errorMessage);
+ Assert.assertTrue(errorMessage.contains("NUM_GROUPS_LIMIT has been
reached"), errorMessage);
}
// for debug only
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 807e0d038a5..f26d02315f1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -116,6 +116,7 @@ public class OpChainSchedulerService {
private void registerInternal(OpChain operatorChain) {
OpChainId opChainId = operatorChain.getId();
+ MultiStageOperator rootOperator = operatorChain.getRoot();
Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
@@ -127,11 +128,11 @@ public class OpChainSchedulerService {
Tracing.ThreadAccountantOps.setupWorker(opChainId.getStageId(),
ThreadExecutionContext.TaskType.MSE,
operatorChain.getParentContext());
LOGGER.trace("({}): Executing", operatorChain);
- MseBlock result = operatorChain.getRoot().nextBlock();
+ MseBlock result = rootOperator.nextBlock();
while (result.isData()) {
- result = operatorChain.getRoot().nextBlock();
+ result = rootOperator.nextBlock();
}
- MultiStageQueryStats stats =
operatorChain.getRoot().calculateStats();
+ MultiStageQueryStats stats = rootOperator.calculateStats();
if (result.isError()) {
errorBlock = (ErrorMseBlock) result;
LOGGER.error("({}): Completed erroneously {} {}", operatorChain,
stats, errorBlock.getErrorMessages());
@@ -154,7 +155,7 @@ public class OpChainSchedulerService {
}
}
});
- _opChainCache.put(opChainId, operatorChain.getRoot());
+ _opChainCache.put(opChainId, rootOperator);
_submittedOpChainMap.put(opChainId, scheduledFuture);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index f9fb427dd1b..da39a78fea6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -53,6 +53,7 @@ import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
import org.apache.pinot.query.runtime.operator.utils.SortUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@@ -235,8 +236,8 @@ public class AggregateOperator extends MultiStageOperator {
if (_groupByExecutor.isNumGroupsLimitReached()) {
if (_errorOnNumGroupsLimit) {
- _input.earlyTerminate();
- throw new RuntimeException("NUM_GROUPS_LIMIT has been reached at "
+ _operatorId);
+ throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+ "NUM_GROUPS_LIMIT has been reached at: " + _operatorId);
} else {
_statMap.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, true);
_input.earlyTerminate();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
index 73d229a2304..cb88d64bdf5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
@@ -32,7 +32,6 @@ import
org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.trace.Tracing;
public class AsofJoinOperator extends BaseJoinOperator {
@@ -103,7 +102,7 @@ public class AsofJoinOperator extends BaseJoinOperator {
if (matchKey == null) {
// Rows with null match keys cannot be matched with any right rows
if (needUnmatchedLeftRows()) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(leftRow, null));
}
continue;
@@ -112,18 +111,18 @@ public class AsofJoinOperator extends BaseJoinOperator {
NavigableMap<Comparable<?>, Object[]> rightRows =
_rightTable.get(hashKey);
if (rightRows == null) {
if (needUnmatchedLeftRows()) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(leftRow, null));
}
} else {
Object[] rightRow = closestMatch(matchKey, rightRows);
if (rightRow == null) {
if (needUnmatchedLeftRows()) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(leftRow, null));
}
} else {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(leftRow, rightRow));
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 71bee7bfa45..f6cf8e578c7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -132,12 +132,12 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
}
@Override
- protected void sampleAndCheckInterruption() {
+ protected long getDeadlineMs() {
// mailbox receive operator uses passive deadline instead of the active
one because it is not an active operator
// as it just waits for data from the mailbox.
// This way if timeout is reached, it will be less probable to hit the
timeout here, on the stage waiting for data,
// than in the operator that is actively processing the data, which will
produce a more meaningful error message.
- sampleAndCheckInterruption(_context.getPassiveDeadlineMs());
+ return _context.getPassiveDeadlineMs();
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index dfec8c7e44c..091ffa46e77 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -39,7 +39,6 @@ import
org.apache.pinot.query.runtime.operator.join.LongLookupTable;
import org.apache.pinot.query.runtime.operator.join.LookupTable;
import org.apache.pinot.query.runtime.operator.join.ObjectLookupTable;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.trace.Tracing;
/**
@@ -127,7 +126,7 @@ public class HashJoinOperator extends BaseJoinOperator {
}
continue;
}
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(_rightTable.size());
+ sampleAndCheckInterruptionPeriodically(_rightTable.size());
_rightTable.addRow(key, row);
}
}
@@ -215,7 +214,7 @@ public class HashJoinOperator extends BaseJoinOperator {
break;
}
// defer copying of the content until row matches
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(resultRowView.toArray());
if (_matchedRightRows != null) {
_matchedRightRows.put(key, BIT_SET_PLACEHOLDER);
@@ -235,7 +234,7 @@ public class HashJoinOperator extends BaseJoinOperator {
List<Object[]> rows = new ArrayList<>(leftRows.size());
for (Object[] leftRow : leftRows) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
Object key = _leftKeySelector.getKey(leftRow);
// Skip rows with null join keys - they should not participate in
equi-joins per SQL standard
if (handleNullKey(key, leftRow, rows)) {
@@ -255,7 +254,7 @@ public class HashJoinOperator extends BaseJoinOperator {
maxRowsLimitReached = true;
break;
}
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(resultRowView.toArray());
hasMatchForLeftRow = true;
if (_matchedRightRows != null) {
@@ -280,7 +279,7 @@ public class HashJoinOperator extends BaseJoinOperator {
if (isMaxRowsLimitReached(rows.size())) {
return;
}
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(leftRow, null));
}
}
@@ -293,7 +292,7 @@ public class HashJoinOperator extends BaseJoinOperator {
for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
if (_rightTable.containsKey(key)) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(leftRow);
}
}
@@ -309,7 +308,7 @@ public class HashJoinOperator extends BaseJoinOperator {
for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
if (!_rightTable.containsKey(key)) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(leftRow);
}
}
@@ -326,7 +325,7 @@ public class HashJoinOperator extends BaseJoinOperator {
for (Map.Entry<Object, Object> entry : _rightTable.entrySet()) {
Object[] rightRow = (Object[]) entry.getValue();
if (!_matchedRightRows.containsKey(entry.getKey())) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(null, rightRow));
}
}
@@ -336,14 +335,14 @@ public class HashJoinOperator extends BaseJoinOperator {
BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
if (matchedIndices == null) {
for (Object[] rightRow : rightRows) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(null, rightRow));
}
} else {
int numRightRows = rightRows.size();
int unmatchedIndex = 0;
while ((unmatchedIndex =
matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
}
}
@@ -352,7 +351,7 @@ public class HashJoinOperator extends BaseJoinOperator {
// Add unmatched null key rows from right side for RIGHT and FULL JOIN
if (_nullKeyRightRows != null) {
for (Object[] nullKeyRow : _nullKeyRightRows) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(null, nullKeyRow));
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index c1a0fcf6e95..cfdfecedb95 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -235,9 +235,9 @@ public class MailboxSendOperator extends MultiStageOperator
{
}
@Override
- protected void sampleAndCheckInterruption() {
+ protected long getDeadlineMs() {
// mailbox send operator uses passive deadline instead of the active one
- sampleAndCheckInterruption(_context.getPassiveDeadlineMs());
+ return _context.getPassiveDeadlineMs();
}
private void sendEos(MseBlock.Eos eosBlockWithoutStats)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index abd61d2ea81..fc7bb41106b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -48,17 +48,15 @@ import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
-public abstract class MultiStageOperator
- implements Operator<MseBlock>, AutoCloseable {
-
+public abstract class MultiStageOperator implements Operator<MseBlock>,
AutoCloseable {
protected final OpChainExecutionContext _context;
protected final String _operatorId;
+
protected boolean _isEarlyTerminated;
public MultiStageOperator(OpChainExecutionContext context) {
_context = context;
_operatorId = Joiner.on("_").join(getClass().getSimpleName(),
_context.getStageId(), _context.getServer());
- _isEarlyTerminated = false;
}
/**
@@ -74,35 +72,34 @@ public abstract class MultiStageOperator
public abstract void registerExecution(long time, int numRows);
- /// This method should be called periodically by the operator to check
whether the execution should be interrupted.
- ///
- /// This could happen when the request deadline is reached, or the thread
accountant decides to interrupt the query
- /// due to resource constraints.
- ///
- /// Normally, callers should call [#sampleAndCheckInterruption(long
deadlineMs)] passing the correct deadline, but
- /// given most operators use either the active or the passive deadline, this
method is provided as a convenience
- /// method. By default, it uses the active deadline, which is the one that
should be used for most operators, but
- /// if the operator does not actively process data (ie both mailbox
operators), it should override this method to
- /// use the passive deadline instead.
- /// See for example
[MailboxSendOperator][org.apache.pinot.query.runtime.operator.MailboxSendOperator]).
- protected void sampleAndCheckInterruption() {
- sampleAndCheckInterruption(_context.getActiveDeadlineMs());
+ /// By default, it uses the active deadline, which is the one that should be
used for most operators, but if the
+ /// operator does not actively process data (ie both mailbox operators), it
should override this method to use the
+ /// passive deadline instead.
+ protected long getDeadlineMs() {
+ return _context.getActiveDeadlineMs();
}
/// This method should be called periodically by the operator to check
whether the execution should be interrupted.
///
/// This could happen when the request deadline is reached, or the thread
accountant decides to interrupt the query
/// due to resource constraints.
- protected void sampleAndCheckInterruption(long deadlineMs) {
- if (System.currentTimeMillis() >= deadlineMs) {
- earlyTerminate();
- throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on " +
getExplainName());
+ protected void checkInterruption() {
+ if (System.currentTimeMillis() >= getDeadlineMs()) {
+ throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on: " +
getExplainName());
}
- Tracing.ThreadAccountantOps.sample();
if (Tracing.ThreadAccountantOps.isInterrupted()) {
- earlyTerminate();
- throw
QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Resource limit
exceeded for operator: "
- + getExplainName());
+ throw new EarlyTerminationException("Interrupted on: " +
getExplainName());
+ }
+ }
+
+ protected void sampleAndCheckInterruption() {
+ checkInterruption();
+ Tracing.ThreadAccountantOps.sample();
+ }
+
+ protected void sampleAndCheckInterruptionPeriodically(int
numRecordsProcessed) {
+ if ((numRecordsProcessed &
Tracing.ThreadAccountantOps.MAX_ENTRIES_KEYS_MERGED_PER_INTERRUPTION_CHECK_MASK)
== 0) {
+ sampleAndCheckInterruption();
}
}
@@ -113,9 +110,6 @@ public abstract class MultiStageOperator
*/
@Override
public MseBlock nextBlock() {
- if (Tracing.ThreadAccountantOps.isInterrupted()) {
- throw new EarlyTerminationException("Interrupted while processing next
block");
- }
if (logger().isDebugEnabled()) {
logger().debug("Operator {}: Reading next block", _operatorId);
}
@@ -123,6 +117,7 @@ public abstract class MultiStageOperator
MseBlock nextBlock;
Stopwatch executeStopwatch = Stopwatch.createStarted();
try {
+ checkInterruption();
nextBlock = getNextBlock();
} catch (Exception e) {
logger().warn("Operator {}: Exception while processing next block",
_operatorId, e);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
index 6a5a15e97ac..d6e6d4c4de1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
@@ -28,7 +28,6 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.trace.Tracing;
/**
@@ -91,7 +90,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
maxRowsLimitReached = true;
break;
}
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRowView.toArray());
hasMatchForLeftRow = true;
if (_matchedRightRows != null) {
@@ -106,7 +105,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
if (isMaxRowsLimitReached(rows.size())) {
break;
}
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(leftRow, null));
}
}
@@ -124,7 +123,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
List<Object[]> rows = new ArrayList<>(numRightRows - numMatchedRightRows);
int unmatchedIndex = 0;
while ((unmatchedIndex = _matchedRightRows.nextClearBit(unmatchedIndex)) <
numRightRows) {
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(joinRow(null, _rightTable.get(unmatchedIndex++)));
}
return rows;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 51dd2812261..0edaa21d3b9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -43,7 +43,6 @@ import
org.apache.pinot.query.runtime.operator.window.WindowFunction;
import org.apache.pinot.query.runtime.operator.window.WindowFunctionFactory;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.apache.pinot.spi.trace.Tracing;
import
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,7 +230,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
for (Object[] row : container) {
// TODO: Revisit null direction handling for all query types
Key key = AggregationUtils.extractRowKey(row, _keys);
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(_numRows);
+ sampleAndCheckInterruptionPeriodically(_numRows);
partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
}
_numRows += containerSize;
@@ -255,7 +254,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
for (WindowFunction windowFunction : _windowFunctions) {
List<Object> processRows = windowFunction.processRows(rowList);
assert processRows.size() == rowList.size();
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(windowFunctionResults.size());
+ sampleAndCheckInterruptionPeriodically(windowFunctionResults.size());
windowFunctionResults.add(processRows);
}
@@ -268,7 +267,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
}
// Convert the results from WindowFunction to the desired type
TypeUtils.convertRow(row, resultStoredTypes);
-
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+ sampleAndCheckInterruptionPeriodically(rows.size());
rows.add(row);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 8fe38c776e0..c58bbfd26b4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -18,10 +18,9 @@
*/
package org.apache.pinot.query.runtime.plan.pipeline;
+import com.google.common.collect.Maps;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -43,20 +42,26 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
private final Map<Integer, MultiStageOperator> _workerMap;
+ private final List<MultiStageOperator> _childOperators;
+ private final Map<Integer, List<MseBlock>> _resultMap;
+ private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
- private Map<Integer, List<MseBlock>> _resultMap;
private ErrorMseBlock _errorBlock;
- private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
public PipelineBreakerOperator(OpChainExecutionContext context, Map<Integer,
MultiStageOperator> workerMap) {
super(context);
_workerMap = workerMap;
- _resultMap = new HashMap<>();
+ _childOperators = new ArrayList<>(workerMap.values());
+ _resultMap = Maps.newHashMapWithExpectedSize(workerMap.size());
for (int workerKey : workerMap.keySet()) {
_resultMap.put(workerKey, new ArrayList<>());
}
}
+ public Map<Integer, List<MseBlock>> getResultMap() {
+ return _resultMap;
+ }
+
@Override
public void registerExecution(long time, int numRows) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
@@ -65,7 +70,7 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
@Override
public List<MultiStageOperator> getChildOperators() {
- return Collections.emptyList();
+ return _childOperators;
}
@Override
@@ -78,10 +83,6 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
return LOGGER;
}
- public Map<Integer, List<MseBlock>> getResultMap() {
- return _resultMap;
- }
-
@Nullable
public ErrorMseBlock getErrorBlock() {
return _errorBlock;
@@ -100,8 +101,7 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
// NOTE: Put an empty list for each worker in case there is no data block
returned from that worker
if (_workerMap.size() == 1) {
Map.Entry<Integer, MultiStageOperator> entry =
_workerMap.entrySet().iterator().next();
- List<MseBlock> dataBlocks = new ArrayList<>();
- _resultMap = Collections.singletonMap(entry.getKey(), dataBlocks);
+ List<MseBlock> dataBlocks = _resultMap.get(entry.getKey());
Operator<MseBlock> operator = entry.getValue();
MseBlock block = operator.nextBlock();
while (block.isData()) {
@@ -113,10 +113,6 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
return block;
}
} else {
- _resultMap = new HashMap<>();
- for (int workerKey : _workerMap.keySet()) {
- _resultMap.put(workerKey, new ArrayList<>());
- }
// Keep polling from every operator in round-robin fashion
Queue<Map.Entry<Integer, MultiStageOperator>> entries = new
ArrayDeque<>(_workerMap.entrySet());
while (!entries.isEmpty()) {
@@ -154,13 +150,6 @@ public class PipelineBreakerOperator extends
MultiStageOperator {
return new StatMap<>(_statMap);
}
- @Override
- public void close() {
- for (MultiStageOperator operator : _workerMap.values()) {
- operator.close();
- }
- }
-
public enum StatKey implements StatMap.Key {
EXECUTION_TIME_MS(StatMap.Type.LONG),
EMITTED_ROWS(StatMap.Type.LONG);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
index 1aa4b23af29..5f10d49ec37 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
@@ -25,15 +25,17 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
+import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.QueryResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.mockito.MockedStatic;
@@ -41,6 +43,8 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -110,7 +114,7 @@ public class PerQueryCPUMemAccountantTest extends
QueryRunnerAccountingTest {
}
}
- @Test(expectedExceptions = EarlyTerminationException.class)
+ @Test
void testInterrupt() {
HashMap<String, Object> configs = getAccountingConfig();
@@ -121,7 +125,10 @@ public class PerQueryCPUMemAccountantTest extends
QueryRunnerAccountingTest {
try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class,
Mockito.CALLS_REAL_METHODS)) {
tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
- queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable();
+ QueryDispatcher.QueryResult queryResult = queryRunner("SELECT * FROM a
LIMIT 2", false);
+ QueryProcessingException processingException =
queryResult.getProcessingException();
+ assertNotNull(processingException);
+ assertEquals(processingException.getErrorCode(),
QueryErrorCode.INTERNAL.getId());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]