This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f72b494858 Modify MSE operators to release memory once computation finished (#15977) f72b494858 is described below commit f72b4948588d361df86746a0557d3f095f9cba2a Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Wed Jun 4 12:41:37 2025 +0200 Modify MSE operators to release memory once computation finished (#15977) --- .../query/runtime/operator/AggregateOperator.java | 14 +++++++++++--- .../query/runtime/operator/AsofJoinOperator.java | 10 +++++++++- .../query/runtime/operator/BaseJoinOperator.java | 6 ++++++ .../query/runtime/operator/HashJoinOperator.java | 22 ++++++++++++++++++++-- .../runtime/operator/NonEquiJoinOperator.java | 8 ++++++++ .../runtime/operator/WindowAggregateOperator.java | 17 ++++++++--------- 6 files changed, 62 insertions(+), 15 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index 705d171d6c..cbb148ebed 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -73,8 +73,10 @@ public class AggregateOperator extends MultiStageOperator { private final MultiStageOperator _input; private final DataSchema _resultSchema; private final AggregationFunction<?, ?>[] _aggFunctions; - private final MultistageAggregationExecutor _aggregationExecutor; - private final MultistageGroupByExecutor _groupByExecutor; + @Nullable + private MultistageAggregationExecutor _aggregationExecutor; + @Nullable + private MultistageGroupByExecutor _groupByExecutor; @Nullable private MseBlock.Eos _eosBlock; @@ -205,13 +207,17 @@ public class AggregateOperator extends MultiStageOperator { if (finalBlock.isError()) { return finalBlock; } - return produceAggregatedBlock(); + MseBlock mseBlock = produceAggregatedBlock(); + _aggregationExecutor = null; + _groupByExecutor = null; + return mseBlock; } private MseBlock produceAggregatedBlock() { if (_aggregationExecutor != null) { return new RowHeapDataBlock(_aggregationExecutor.getResult(), _resultSchema, _aggFunctions); } else { + assert _groupByExecutor != null; List<Object[]> rows; if (_comparator != null) { rows = _groupByExecutor.getResult(_comparator, _groupTrimSize); @@ -253,6 +259,7 @@ public class AggregateOperator extends MultiStageOperator { * @return the last block, which must always be either an error or the end of the stream */ private MseBlock.Eos consumeGroupBy() { + assert _groupByExecutor != null; MseBlock block = _input.nextBlock(); while (block.isData()) { _groupByExecutor.processBlock((MseBlock.Data) block); @@ -268,6 +275,7 @@ public class AggregateOperator extends MultiStageOperator { * @return the last block, which must always be either an error or the end of the stream */ private MseBlock.Eos consumeAggregation() { + assert _aggregationExecutor != null; MseBlock block = _input.nextBlock(); while (block.isData()) { _aggregationExecutor.processBlock((MseBlock.Data) block); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java index 5c98a9ce29..3dd0a7781e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java @@ -39,7 +39,8 @@ public class AsofJoinOperator extends BaseJoinOperator { // The right table is a map from the hash key (columns in the ON join condition) to a sorted map of match key // (column in the MATCH_CONDITION) to rows. - private final Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable; + @Nullable + private Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable; private final KeySelector<?> _leftKeySelector; private final KeySelector<?> _rightKeySelector; private final MatchConditionType _matchConditionType; @@ -68,6 +69,7 @@ public class AsofJoinOperator extends BaseJoinOperator { @Override protected void addRowsToRightTable(List<Object[]> rows) { + assert _rightTable != null : "Right table should not be null when adding rows"; for (Object[] row : rows) { Comparable<?> matchKey = (Comparable<?>) row[_rightMatchKeyIndex]; if (matchKey == null) { @@ -86,8 +88,14 @@ public class AsofJoinOperator extends BaseJoinOperator { // no-op } + @Override + protected void onEosProduced() { + _rightTable = null; // Release memory in case we keep the operator around for a while + } + @Override protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) { + assert _rightTable != null : "Right table should not be null when building joined rows"; List<Object[]> rows = new ArrayList<>(); for (Object[] leftRow : leftBlock.asRowHeap().getRows()) { Comparable<?> matchKey = (Comparable<?>) leftRow[_leftMatchKeyIndex]; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java index 5040bae620..ff5ad30cfb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java @@ -169,9 +169,15 @@ public abstract class BaseJoinOperator extends MultiStageOperator { } MseBlock mseBlock = buildJoinedDataBlock(); LOGGER.trace("Returning {} for join operator", mseBlock); + if (mseBlock.isEos()) { + _eos = (MseBlock.Eos) mseBlock; + onEosProduced(); + } return mseBlock; } + protected abstract void onEosProduced(); + protected void buildRightTable() { LOGGER.trace("Building right table for join operator"); long startTime = System.currentTimeMillis(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index 0cb1032367..9ad75c420f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -24,6 +24,7 @@ import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; @@ -51,11 +52,13 @@ public class HashJoinOperator extends BaseJoinOperator { private final KeySelector<?> _leftKeySelector; private final KeySelector<?> _rightKeySelector; - private final LookupTable _rightTable; + @Nullable + private LookupTable _rightTable; // Track matched right rows for right join and full join to output non-matched right rows. // TODO: Revisit whether we should use IntList or RoaringBitmap for smaller memory footprint. // TODO: Optimize this - private final Map<Object, BitSet> _matchedRightRows; + @Nullable + private Map<Object, BitSet> _matchedRightRows; public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftInput, DataSchema leftSchema, MultiStageOperator rightInput, JoinNode node) { @@ -93,6 +96,7 @@ public class HashJoinOperator extends BaseJoinOperator { @Override protected void addRowsToRightTable(List<Object[]> rows) { + assert _rightTable != null : "Right table should not be null when adding rows"; for (Object[] row : rows) { _rightTable.addRow(_rightKeySelector.getKey(row), row); } @@ -100,11 +104,19 @@ public class HashJoinOperator extends BaseJoinOperator { @Override protected void finishBuildingRightTable() { + assert _rightTable != null : "Right table should not be null when finishing building"; _rightTable.finish(); } + @Override + protected void onEosProduced() { + _rightTable = null; + _matchedRightRows = null; + } + @Override protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) { + assert _rightTable != null : "Right table should not be null when building joined rows"; switch (_joinType) { case SEMI: return buildJoinedDataBlockSemi(leftBlock); @@ -121,6 +133,7 @@ public class HashJoinOperator extends BaseJoinOperator { } private List<Object[]> buildJoinedDataBlockUniqueKeys(MseBlock.Data leftBlock) { + assert _rightTable != null : "Right table should not be null when building joined rows"; List<Object[]> leftRows = leftBlock.asRowHeap().getRows(); ArrayList<Object[]> rows = new ArrayList<>(leftRows.size()); @@ -149,6 +162,7 @@ public class HashJoinOperator extends BaseJoinOperator { } private List<Object[]> buildJoinedDataBlockDuplicateKeys(MseBlock.Data leftBlock) { + assert _rightTable != null : "Right table should not be null when building joined rows"; List<Object[]> leftRows = leftBlock.asRowHeap().getRows(); List<Object[]> rows = new ArrayList<>(leftRows.size()); @@ -197,6 +211,7 @@ public class HashJoinOperator extends BaseJoinOperator { } private List<Object[]> buildJoinedDataBlockSemi(MseBlock.Data leftBlock) { + assert _rightTable != null : "Right table should not be null when building joined rows"; List<Object[]> leftRows = leftBlock.asRowHeap().getRows(); List<Object[]> rows = new ArrayList<>(leftRows.size()); @@ -212,6 +227,7 @@ public class HashJoinOperator extends BaseJoinOperator { } private List<Object[]> buildJoinedDataBlockAnti(MseBlock.Data leftBlock) { + assert _rightTable != null : "Right table should not be null when building joined rows"; List<Object[]> leftRows = leftBlock.asRowHeap().getRows(); List<Object[]> rows = new ArrayList<>(leftRows.size()); @@ -228,6 +244,8 @@ public class HashJoinOperator extends BaseJoinOperator { @Override protected List<Object[]> buildNonMatchRightRows() { + assert _rightTable != null : "Right table should not be null when building non-matched right rows"; + assert _matchedRightRows != null : "Matched right rows should not be null when building non-matched right rows"; List<Object[]> rows = new ArrayList<>(); if (_rightTable.isKeysUnique()) { for (Map.Entry<Object, Object> entry : _rightTable.entrySet()) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java index 34c3e99287..625b92fa11 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.BitSet; import java.util.List; +import javax.annotation.Nullable; import org.apache.calcite.rel.core.JoinRelType; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.plannode.JoinNode; @@ -39,6 +40,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator { private final List<Object[]> _rightTable; // Track matched right rows for right join and full join to output non-matched right rows. // TODO: Revisit whether we should use IntList or RoaringBitmap for smaller memory footprint. + @Nullable private BitSet _matchedRightRows; public NonEquiJoinOperator(OpChainExecutionContext context, MultiStageOperator leftInput, DataSchema leftSchema, @@ -67,6 +69,11 @@ public class NonEquiJoinOperator extends BaseJoinOperator { } } + @Override + protected void onEosProduced() { + _matchedRightRows = null; + } + @Override protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) { ArrayList<Object[]> rows = new ArrayList<>(); @@ -106,6 +113,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator { @Override protected List<Object[]> buildNonMatchRightRows() { + assert _matchedRightRows != null : "Matched right rows should not be null when building non-matched right rows"; int numRightRows = _rightTable.size(); int numMatchedRightRows = _matchedRightRows.cardinality(); if (numMatchedRightRows == numRightRows) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java index 2ac18a0647..f50fe2a5db 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java @@ -91,9 +91,7 @@ public class WindowAggregateOperator extends MultiStageOperator { private final MultiStageOperator _input; private final DataSchema _resultSchema; private final int[] _keys; - private final WindowFrame _windowFrame; private final WindowFunction[] _windowFunctions; - private final Map<Key, List<Object[]>> _partitionRows = new HashMap<>(); private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class); // Below are specific parameters to protect the window cache from growing too large. @@ -125,12 +123,12 @@ public class WindowAggregateOperator extends MultiStageOperator { for (int i = 0; i < numKeys; i++) { _keys[i] = keys.get(i); } - _windowFrame = new WindowFrame(node.getWindowFrameType(), node.getLowerBound(), node.getUpperBound()); + WindowFrame windowFrame = new WindowFrame(node.getWindowFrameType(), node.getLowerBound(), node.getUpperBound()); Preconditions.checkState( - _windowFrame.isRowType() || ((_windowFrame.isUnboundedPreceding() || _windowFrame.isLowerBoundCurrentRow()) && ( - _windowFrame.isUnboundedFollowing() || _windowFrame.isUpperBoundCurrentRow())), + windowFrame.isRowType() || ((windowFrame.isUnboundedPreceding() || windowFrame.isLowerBoundCurrentRow()) && ( + windowFrame.isUnboundedFollowing() || windowFrame.isUpperBoundCurrentRow())), "RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); - Preconditions.checkState(_windowFrame.getLowerBound() <= _windowFrame.getUpperBound(), + Preconditions.checkState(windowFrame.getLowerBound() <= windowFrame.getUpperBound(), "Window frame lower bound can't be greater than upper bound"); List<RelFieldCollation> collations = node.getCollations(); List<RexExpression.FunctionCall> aggCalls = node.getAggCalls(); @@ -139,7 +137,7 @@ public class WindowAggregateOperator extends MultiStageOperator { for (int i = 0; i < numAggCalls; i++) { RexExpression.FunctionCall aggCall = aggCalls.get(i); _windowFunctions[i] = - WindowFunctionFactory.constructWindowFunction(aggCall, inputSchema, collations, _windowFrame); + WindowFunctionFactory.constructWindowFunction(aggCall, inputSchema, collations, windowFrame); } Map<String, String> metadata = context.getOpChainMetadata(); @@ -210,6 +208,7 @@ public class WindowAggregateOperator extends MultiStageOperator { * @return the final block, which must be either an end of stream or an error. */ private MseBlock computeBlocks() { + Map<Key, List<Object[]>> partitionRows = new HashMap<>(); MseBlock block = _input.nextBlock(); while (block.isData()) { List<Object[]> container = ((MseBlock.Data) block).asRowHeap().getRows(); @@ -231,7 +230,7 @@ public class WindowAggregateOperator extends MultiStageOperator { for (Object[] row : container) { // TODO: Revisit null direction handling for all query types Key key = AggregationUtils.extractRowKey(row, _keys); - _partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row); + partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row); } _numRows += containerSize; sampleAndCheckInterruption(); @@ -246,7 +245,7 @@ public class WindowAggregateOperator extends MultiStageOperator { ColumnDataType[] resultStoredTypes = _resultSchema.getStoredColumnDataTypes(); List<Object[]> rows = new ArrayList<>(_numRows); - for (Map.Entry<Key, List<Object[]>> e : _partitionRows.entrySet()) { + for (Map.Entry<Key, List<Object[]>> e : partitionRows.entrySet()) { List<Object[]> rowList = e.getValue(); // Each window function will return a list of results for each row in the input set --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org