This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f72b494858 Modify MSE operators to release memory once computation 
finished (#15977)
f72b494858 is described below

commit f72b4948588d361df86746a0557d3f095f9cba2a
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Wed Jun 4 12:41:37 2025 +0200

    Modify MSE operators to release memory once computation finished (#15977)
---
 .../query/runtime/operator/AggregateOperator.java  | 14 +++++++++++---
 .../query/runtime/operator/AsofJoinOperator.java   | 10 +++++++++-
 .../query/runtime/operator/BaseJoinOperator.java   |  6 ++++++
 .../query/runtime/operator/HashJoinOperator.java   | 22 ++++++++++++++++++++--
 .../runtime/operator/NonEquiJoinOperator.java      |  8 ++++++++
 .../runtime/operator/WindowAggregateOperator.java  | 17 ++++++++---------
 6 files changed, 62 insertions(+), 15 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 705d171d6c..cbb148ebed 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -73,8 +73,10 @@ public class AggregateOperator extends MultiStageOperator {
   private final MultiStageOperator _input;
   private final DataSchema _resultSchema;
   private final AggregationFunction<?, ?>[] _aggFunctions;
-  private final MultistageAggregationExecutor _aggregationExecutor;
-  private final MultistageGroupByExecutor _groupByExecutor;
+  @Nullable
+  private MultistageAggregationExecutor _aggregationExecutor;
+  @Nullable
+  private MultistageGroupByExecutor _groupByExecutor;
 
   @Nullable
   private MseBlock.Eos _eosBlock;
@@ -205,13 +207,17 @@ public class AggregateOperator extends MultiStageOperator 
{
     if (finalBlock.isError()) {
       return finalBlock;
     }
-    return produceAggregatedBlock();
+    MseBlock mseBlock = produceAggregatedBlock();
+    _aggregationExecutor = null;
+    _groupByExecutor = null;
+    return mseBlock;
   }
 
   private MseBlock produceAggregatedBlock() {
     if (_aggregationExecutor != null) {
       return new RowHeapDataBlock(_aggregationExecutor.getResult(), 
_resultSchema, _aggFunctions);
     } else {
+      assert _groupByExecutor != null;
       List<Object[]> rows;
       if (_comparator != null) {
         rows = _groupByExecutor.getResult(_comparator, _groupTrimSize);
@@ -253,6 +259,7 @@ public class AggregateOperator extends MultiStageOperator {
    * @return the last block, which must always be either an error or the end 
of the stream
    */
   private MseBlock.Eos consumeGroupBy() {
+    assert _groupByExecutor != null;
     MseBlock block = _input.nextBlock();
     while (block.isData()) {
       _groupByExecutor.processBlock((MseBlock.Data) block);
@@ -268,6 +275,7 @@ public class AggregateOperator extends MultiStageOperator {
    * @return the last block, which must always be either an error or the end 
of the stream
    */
   private MseBlock.Eos consumeAggregation() {
+    assert _aggregationExecutor != null;
     MseBlock block = _input.nextBlock();
     while (block.isData()) {
       _aggregationExecutor.processBlock((MseBlock.Data) block);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
index 5c98a9ce29..3dd0a7781e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
@@ -39,7 +39,8 @@ public class AsofJoinOperator extends BaseJoinOperator {
 
   // The right table is a map from the hash key (columns in the ON join 
condition) to a sorted map of match key
   // (column in the MATCH_CONDITION) to rows.
-  private final Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable;
+  @Nullable
+  private Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable;
   private final KeySelector<?> _leftKeySelector;
   private final KeySelector<?> _rightKeySelector;
   private final MatchConditionType _matchConditionType;
@@ -68,6 +69,7 @@ public class AsofJoinOperator extends BaseJoinOperator {
 
   @Override
   protected void addRowsToRightTable(List<Object[]> rows) {
+    assert _rightTable != null : "Right table should not be null when adding 
rows";
     for (Object[] row : rows) {
       Comparable<?> matchKey = (Comparable<?>) row[_rightMatchKeyIndex];
       if (matchKey == null) {
@@ -86,8 +88,14 @@ public class AsofJoinOperator extends BaseJoinOperator {
     // no-op
   }
 
+  @Override
+  protected void onEosProduced() {
+    _rightTable = null; // Release memory in case we keep the operator around 
for a while
+  }
+
   @Override
   protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) {
+    assert _rightTable != null : "Right table should not be null when building 
joined rows";
     List<Object[]> rows = new ArrayList<>();
     for (Object[] leftRow : leftBlock.asRowHeap().getRows()) {
       Comparable<?> matchKey = (Comparable<?>) leftRow[_leftMatchKeyIndex];
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index 5040bae620..ff5ad30cfb 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -169,9 +169,15 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
     }
     MseBlock mseBlock = buildJoinedDataBlock();
     LOGGER.trace("Returning {} for join operator", mseBlock);
+    if (mseBlock.isEos()) {
+      _eos = (MseBlock.Eos) mseBlock;
+      onEosProduced();
+    }
     return mseBlock;
   }
 
+  protected abstract void onEosProduced();
+
   protected void buildRightTable() {
     LOGGER.trace("Building right table for join operator");
     long startTime = System.currentTimeMillis();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 0cb1032367..9ad75c420f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -24,6 +24,7 @@ import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
@@ -51,11 +52,13 @@ public class HashJoinOperator extends BaseJoinOperator {
 
   private final KeySelector<?> _leftKeySelector;
   private final KeySelector<?> _rightKeySelector;
-  private final LookupTable _rightTable;
+  @Nullable
+  private LookupTable _rightTable;
   // Track matched right rows for right join and full join to output 
non-matched right rows.
   // TODO: Revisit whether we should use IntList or RoaringBitmap for smaller 
memory footprint.
   // TODO: Optimize this
-  private final Map<Object, BitSet> _matchedRightRows;
+  @Nullable
+  private Map<Object, BitSet> _matchedRightRows;
 
   public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator 
leftInput, DataSchema leftSchema,
       MultiStageOperator rightInput, JoinNode node) {
@@ -93,6 +96,7 @@ public class HashJoinOperator extends BaseJoinOperator {
 
   @Override
   protected void addRowsToRightTable(List<Object[]> rows) {
+    assert _rightTable != null : "Right table should not be null when adding 
rows";
     for (Object[] row : rows) {
       _rightTable.addRow(_rightKeySelector.getKey(row), row);
     }
@@ -100,11 +104,19 @@ public class HashJoinOperator extends BaseJoinOperator {
 
   @Override
   protected void finishBuildingRightTable() {
+    assert _rightTable != null : "Right table should not be null when 
finishing building";
     _rightTable.finish();
   }
 
+  @Override
+  protected void onEosProduced() {
+    _rightTable = null;
+    _matchedRightRows = null;
+  }
+
   @Override
   protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) {
+    assert _rightTable != null : "Right table should not be null when building 
joined rows";
     switch (_joinType) {
       case SEMI:
         return buildJoinedDataBlockSemi(leftBlock);
@@ -121,6 +133,7 @@ public class HashJoinOperator extends BaseJoinOperator {
   }
 
   private List<Object[]> buildJoinedDataBlockUniqueKeys(MseBlock.Data 
leftBlock) {
+    assert _rightTable != null : "Right table should not be null when building 
joined rows";
     List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
     ArrayList<Object[]> rows = new ArrayList<>(leftRows.size());
 
@@ -149,6 +162,7 @@ public class HashJoinOperator extends BaseJoinOperator {
   }
 
   private List<Object[]> buildJoinedDataBlockDuplicateKeys(MseBlock.Data 
leftBlock) {
+    assert _rightTable != null : "Right table should not be null when building 
joined rows";
     List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
     List<Object[]> rows = new ArrayList<>(leftRows.size());
 
@@ -197,6 +211,7 @@ public class HashJoinOperator extends BaseJoinOperator {
   }
 
   private List<Object[]> buildJoinedDataBlockSemi(MseBlock.Data leftBlock) {
+    assert _rightTable != null : "Right table should not be null when building 
joined rows";
     List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
     List<Object[]> rows = new ArrayList<>(leftRows.size());
 
@@ -212,6 +227,7 @@ public class HashJoinOperator extends BaseJoinOperator {
   }
 
   private List<Object[]> buildJoinedDataBlockAnti(MseBlock.Data leftBlock) {
+    assert _rightTable != null : "Right table should not be null when building 
joined rows";
     List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
     List<Object[]> rows = new ArrayList<>(leftRows.size());
 
@@ -228,6 +244,8 @@ public class HashJoinOperator extends BaseJoinOperator {
 
   @Override
   protected List<Object[]> buildNonMatchRightRows() {
+    assert _rightTable != null : "Right table should not be null when building 
non-matched right rows";
+    assert _matchedRightRows != null : "Matched right rows should not be null 
when building non-matched right rows";
     List<Object[]> rows = new ArrayList<>();
     if (_rightTable.isKeysUnique()) {
       for (Map.Entry<Object, Object> entry : _rightTable.entrySet()) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
index 34c3e99287..625b92fa11 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.plannode.JoinNode;
@@ -39,6 +40,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
   private final List<Object[]> _rightTable;
   // Track matched right rows for right join and full join to output 
non-matched right rows.
   // TODO: Revisit whether we should use IntList or RoaringBitmap for smaller 
memory footprint.
+  @Nullable
   private BitSet _matchedRightRows;
 
   public NonEquiJoinOperator(OpChainExecutionContext context, 
MultiStageOperator leftInput, DataSchema leftSchema,
@@ -67,6 +69,11 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
     }
   }
 
+  @Override
+  protected void onEosProduced() {
+    _matchedRightRows = null;
+  }
+
   @Override
   protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) {
     ArrayList<Object[]> rows = new ArrayList<>();
@@ -106,6 +113,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
 
   @Override
   protected List<Object[]> buildNonMatchRightRows() {
+    assert _matchedRightRows != null : "Matched right rows should not be null 
when building non-matched right rows";
     int numRightRows = _rightTable.size();
     int numMatchedRightRows = _matchedRightRows.cardinality();
     if (numMatchedRightRows == numRightRows) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 2ac18a0647..f50fe2a5db 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -91,9 +91,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   private final MultiStageOperator _input;
   private final DataSchema _resultSchema;
   private final int[] _keys;
-  private final WindowFrame _windowFrame;
   private final WindowFunction[] _windowFunctions;
-  private final Map<Key, List<Object[]>> _partitionRows = new HashMap<>();
   private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
 
   // Below are specific parameters to protect the window cache from growing 
too large.
@@ -125,12 +123,12 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
     for (int i = 0; i < numKeys; i++) {
       _keys[i] = keys.get(i);
     }
-    _windowFrame = new WindowFrame(node.getWindowFrameType(), 
node.getLowerBound(), node.getUpperBound());
+    WindowFrame windowFrame = new WindowFrame(node.getWindowFrameType(), 
node.getLowerBound(), node.getUpperBound());
     Preconditions.checkState(
-        _windowFrame.isRowType() || ((_windowFrame.isUnboundedPreceding() || 
_windowFrame.isLowerBoundCurrentRow()) && (
-            _windowFrame.isUnboundedFollowing() || 
_windowFrame.isUpperBoundCurrentRow())),
+        windowFrame.isRowType() || ((windowFrame.isUnboundedPreceding() || 
windowFrame.isLowerBoundCurrentRow()) && (
+            windowFrame.isUnboundedFollowing() || 
windowFrame.isUpperBoundCurrentRow())),
         "RANGE window frame with offset PRECEDING / FOLLOWING is not 
supported");
-    Preconditions.checkState(_windowFrame.getLowerBound() <= 
_windowFrame.getUpperBound(),
+    Preconditions.checkState(windowFrame.getLowerBound() <= 
windowFrame.getUpperBound(),
         "Window frame lower bound can't be greater than upper bound");
     List<RelFieldCollation> collations = node.getCollations();
     List<RexExpression.FunctionCall> aggCalls = node.getAggCalls();
@@ -139,7 +137,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
     for (int i = 0; i < numAggCalls; i++) {
       RexExpression.FunctionCall aggCall = aggCalls.get(i);
       _windowFunctions[i] =
-          WindowFunctionFactory.constructWindowFunction(aggCall, inputSchema, 
collations, _windowFrame);
+          WindowFunctionFactory.constructWindowFunction(aggCall, inputSchema, 
collations, windowFrame);
     }
 
     Map<String, String> metadata = context.getOpChainMetadata();
@@ -210,6 +208,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
    * @return the final block, which must be either an end of stream or an 
error.
    */
   private MseBlock computeBlocks() {
+    Map<Key, List<Object[]>> partitionRows = new HashMap<>();
     MseBlock block = _input.nextBlock();
     while (block.isData()) {
       List<Object[]> container = ((MseBlock.Data) block).asRowHeap().getRows();
@@ -231,7 +230,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       for (Object[] row : container) {
         // TODO: Revisit null direction handling for all query types
         Key key = AggregationUtils.extractRowKey(row, _keys);
-        _partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
+        partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
       }
       _numRows += containerSize;
       sampleAndCheckInterruption();
@@ -246,7 +245,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
 
     ColumnDataType[] resultStoredTypes = 
_resultSchema.getStoredColumnDataTypes();
     List<Object[]> rows = new ArrayList<>(_numRows);
-    for (Map.Entry<Key, List<Object[]>> e : _partitionRows.entrySet()) {
+    for (Map.Entry<Key, List<Object[]>> e : partitionRows.entrySet()) {
       List<Object[]> rowList = e.getValue();
 
       // Each window function will return a list of results for each row in 
the input set


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to