yashmayya commented on code in PR #13922:
URL: https://github.com/apache/pinot/pull/13922#discussion_r1740900198


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java:
##########
@@ -432,6 +447,66 @@ private boolean needUnmatchedLeftRows() {
     return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
   }
 
+  private void earlyTerminateLeftInput() {
+    _leftInput.earlyTerminate();
+    TransferableBlock leftBlock = _leftInput.nextBlock();
+
+    while (!leftBlock.isSuccessfulEndOfStreamBlock()) {
+      if (leftBlock.isErrorBlock()) {
+        _upstreamErrorBlock = leftBlock;
+        return;
+      }
+      leftBlock = _leftInput.nextBlock();
+    }
+
+    assert leftBlock.isSuccessfulEndOfStreamBlock();
+    assert _rightSideStats != null;
+    _leftSideStats = leftBlock.getQueryStats();
+    assert _leftSideStats != null;
+    _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), _statMap);
+    _isTerminated = true;
+  }
+
+  /**
+   * Increments {@link #_currentJoinedRows} and checks if the limit has been 
exceeded. If the limit has been exceeded,
+   * either an exception is thrown or the left input is early terminated based 
on the {@link #_joinOverflowMode}.
+   *
+   * @return {@code true} if the limit has been exceeded, {@code false} 
otherwise
+   */
+  private boolean incrementJoinedRowsAndCheckLimit() throws 
ProcessingException {
+    _currentJoinedRows++;
+    if (_currentJoinedRows > _maxRowsInJoin) {
+      if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+        throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, 
reached number of rows limit: "
+            + _maxRowsInJoin);
+      } else {
+        // Skip over remaining blocks until we reach the end of stream since 
we already breached the rows limit.
+        logger().info("Terminating join operator early as the maximum number 
of rows limit was reached: {}",
+            _maxRowsInJoin);
+        earlyTerminateLeftInput();
+        _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
+        return true;
+      }
+    }
+
+    return false;
+  }

Review Comment:
   We're incrementing `_currentJoinedRows` for every matched row and checking 
against the max rows limit so that we can exit early as soon as the max rows 
limit is breached. Alternatively, we could just modify it once after processing 
all the rows from a block, but that would be less accurate. This alternative 
would be slightly more efficient since there would be less checks and 
increments but it could still result in a very large number of joined rows 
being emitted depending on the block size. I don't think the overhead from the 
integer increment and limit check should be too concerning given the complexity 
of the other existing operations in each iteration of the main join loops, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to