This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c3fc1b9156 [Multi-stage] Only track max joined rows within each block 
(#13981)
c3fc1b9156 is described below

commit c3fc1b915675366163bd0e54204ccec47b3ae2e6
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Sep 12 11:37:40 2024 -0700

    [Multi-stage] Only track max joined rows within each block (#13981)
---
 .../query/runtime/operator/HashJoinOperator.java   | 98 ++++++++++------------
 1 file changed, 44 insertions(+), 54 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 8289d1c86f..9d845561c8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -110,8 +110,6 @@ public class HashJoinOperator extends MultiStageOperator {
   private final JoinOverFlowMode _joinOverflowMode;
 
   private boolean _isHashTableBuilt;
-  private int _currentRowsInHashTable;
-  private int _currentJoinedRows;
   private TransferableBlock _upstreamErrorBlock;
   private MultiStageQueryStats _leftSideStats;
   private MultiStageQueryStats _rightSideStats;
@@ -150,8 +148,6 @@ public class HashJoinOperator extends MultiStageOperator {
     PlanNode.NodeHint nodeHint = node.getNodeHint();
     _maxRowsInJoin = getMaxRowsInJoin(metadata, nodeHint);
     _joinOverflowMode = getJoinOverflowMode(metadata, nodeHint);
-    _currentRowsInHashTable = 0;
-    _currentJoinedRows = 0;
   }
 
   @Override
@@ -225,17 +221,18 @@ public class HashJoinOperator extends MultiStageOperator {
   private void buildBroadcastHashTable()
       throws ProcessingException {
     long startTime = System.currentTimeMillis();
+    int numRowsInHashTable = 0;
     TransferableBlock rightBlock = _rightInput.nextBlock();
     while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
       List<Object[]> container = rightBlock.getContainer();
       // Row based overflow check.
-      if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) {
+      if (container.size() + numRowsInHashTable > _maxRowsInJoin) {
         if (_joinOverflowMode == JoinOverFlowMode.THROW) {
-          throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in 
memory hash table for join operator, "
-              + "reached number of rows limit: " + _maxRowsInJoin);
+          throwProcessingExceptionForJoinRowLimitExceeded(
+              "Cannot build in memory hash table for join operator, reached 
number of rows limit: " + _maxRowsInJoin);
         } else {
           // Just fill up the buffer.
-          int remainingRows = _maxRowsInJoin - _currentRowsInHashTable;
+          int remainingRows = _maxRowsInJoin - numRowsInHashTable;
           container = container.subList(0, remainingRows);
           _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
           // setting only the rightTableOperator to be early terminated and 
awaits EOS block next.
@@ -252,7 +249,7 @@ public class HashJoinOperator extends MultiStageOperator {
         }
         hashCollection.add(row);
       }
-      _currentRowsInHashTable += container.size();
+      numRowsInHashTable += container.size();
       sampleAndCheckInterruption();
       rightBlock = _rightInput.nextBlock();
     }
@@ -319,25 +316,6 @@ public class HashJoinOperator extends MultiStageOperator {
     }
   }
 
-  private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
-      throws ProcessingException {
-    List<Object[]> container = leftBlock.getContainer();
-    List<Object[]> rows = new ArrayList<>(container.size());
-
-    for (Object[] leftRow : container) {
-      Object key = _leftKeySelector.getKey(leftRow);
-      // SEMI-JOIN only checks existence of the key
-      if (_broadcastRightTable.containsKey(key)) {
-        if (incrementJoinedRowsAndCheckLimit()) {
-          break;
-        }
-        rows.add(joinRow(leftRow, null));
-      }
-    }
-
-    return rows;
-  }
-
   private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock 
leftBlock)
       throws ProcessingException {
     List<Object[]> container = leftBlock.getContainer();
@@ -349,7 +327,7 @@ public class HashJoinOperator extends MultiStageOperator {
       List<Object[]> rightRows = _broadcastRightTable.get(key);
       if (rightRows == null) {
         if (needUnmatchedLeftRows()) {
-          if (incrementJoinedRowsAndCheckLimit()) {
+          if (isMaxRowsLimitReached(rows.size())) {
             break;
           }
           rows.add(joinRow(leftRow, null));
@@ -359,13 +337,15 @@ public class HashJoinOperator extends MultiStageOperator {
       boolean hasMatchForLeftRow = false;
       int numRightRows = rightRows.size();
       rows.ensureCapacity(rows.size() + numRightRows);
+      boolean maxRowsLimitReached = false;
       for (int i = 0; i < numRightRows; i++) {
         Object[] rightRow = rightRows.get(i);
         // TODO: Optimize this to avoid unnecessary object copy.
         Object[] resultRow = joinRow(leftRow, rightRow);
         if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream()
             .allMatch(evaluator -> 
BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) {
-          if (incrementJoinedRowsAndCheckLimit()) {
+          if (isMaxRowsLimitReached(rows.size())) {
+            maxRowsLimitReached = true;
             break;
           }
           rows.add(resultRow);
@@ -375,11 +355,11 @@ public class HashJoinOperator extends MultiStageOperator {
           }
         }
       }
-      if (_currentJoinedRows > _maxRowsInJoin) {
+      if (maxRowsLimitReached) {
         break;
       }
       if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
-        if (incrementJoinedRowsAndCheckLimit()) {
+        if (isMaxRowsLimitReached(rows.size())) {
           break;
         }
         rows.add(joinRow(leftRow, null));
@@ -389,8 +369,22 @@ public class HashJoinOperator extends MultiStageOperator {
     return rows;
   }
 
-  private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
-      throws ProcessingException {
+  private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) 
{
+    List<Object[]> container = leftBlock.getContainer();
+    List<Object[]> rows = new ArrayList<>(container.size());
+
+    for (Object[] leftRow : container) {
+      Object key = _leftKeySelector.getKey(leftRow);
+      // SEMI-JOIN only checks existence of the key
+      if (_broadcastRightTable.containsKey(key)) {
+        rows.add(joinRow(leftRow, null));
+      }
+    }
+
+    return rows;
+  }
+
+  private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) 
{
     List<Object[]> container = leftBlock.getContainer();
     List<Object[]> rows = new ArrayList<>(container.size());
 
@@ -398,9 +392,6 @@ public class HashJoinOperator extends MultiStageOperator {
       Object key = _leftKeySelector.getKey(leftRow);
       // ANTI-JOIN only checks non-existence of the key
       if (!_broadcastRightTable.containsKey(key)) {
-        if (incrementJoinedRowsAndCheckLimit()) {
-          break;
-        }
         rows.add(joinRow(leftRow, null));
       }
     }
@@ -475,18 +466,17 @@ public class HashJoinOperator extends MultiStageOperator {
   }
 
   /**
-   * Increments {@link #_currentJoinedRows} and checks if the limit has been 
exceeded. If the limit has been exceeded,
-   * either an exception is thrown or the left input is early terminated based 
on the {@link #_joinOverflowMode}.
+   * Checks if we have reached the rows limit for joined rows. If the limit 
has been reached, either an exception is
+   * thrown or the left input is early terminated based on the {@link 
#_joinOverflowMode}.
    *
-   * @return {@code true} if the limit has been exceeded, {@code false} 
otherwise
+   * @return {@code true} if the limit has been reached, {@code false} 
otherwise.
    */
-  private boolean incrementJoinedRowsAndCheckLimit()
+  private boolean isMaxRowsLimitReached(int numJoinedRows)
       throws ProcessingException {
-    _currentJoinedRows++;
-    if (_currentJoinedRows > _maxRowsInJoin) {
+    if (numJoinedRows == _maxRowsInJoin) {
       if (_joinOverflowMode == JoinOverFlowMode.THROW) {
-        throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, 
reached number of rows limit: "
-            + _maxRowsInJoin);
+        throwProcessingExceptionForJoinRowLimitExceeded(
+            "Cannot process join, reached number of rows limit: " + 
_maxRowsInJoin);
       } else {
         // Skip over remaining blocks until we reach the end of stream since 
we already breached the rows limit.
         logger().info("Terminating join operator early as the maximum number 
of rows limit was reached: {}",
@@ -504,15 +494,15 @@ public class HashJoinOperator extends MultiStageOperator {
       throws ProcessingException {
     ProcessingException resourceLimitExceededException =
         new 
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
-    resourceLimitExceededException.setMessage(
-        reason + ". Consider increasing the limit for the maximum number of 
rows in a join either via the query "
-            + "option '" + 
CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
-            + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in 
the '"
-            + PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if 
partial results are acceptable, the join"
-            + " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() 
+ "' either via the query option '"
-            + CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE 
+ "' or the '"
-            + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in 
the '"
-            + PinotHintOptions.JOIN_HINT_OPTIONS + "'.");
+    resourceLimitExceededException.setMessage(reason
+        + ". Consider increasing the limit for the maximum number of rows in a 
join either via the query option '"
+        + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' 
or the '"
+        + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the 
'" + PinotHintOptions.JOIN_HINT_OPTIONS
+        + "'. Alternatively, if partial results are acceptable, the join 
overflow mode can be set to '"
+        + JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+        + CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + 
"' or the '"
+        + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the 
'" + PinotHintOptions.JOIN_HINT_OPTIONS
+        + "'.");
     throw resourceLimitExceededException;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to