This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a5ae4d738a [Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks (#12406) a5ae4d738a is described below commit a5ae4d738a0e646d500b6bb3e2031f7227f035e2 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Feb 14 00:03:02 2024 -0800 [Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks (#12406) --- .../runtime/operator/BaseMailboxReceiveOperator.java | 10 +--------- .../query/runtime/operator/MailboxReceiveOperator.java | 4 ++-- .../runtime/operator/SortedMailboxReceiveOperator.java | 17 +++++++++++------ 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java index 808caba04e..a88e122739 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java @@ -47,7 +47,7 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator { protected final MailboxService _mailboxService; protected final RelDistribution.Type _exchangeType; protected final List<String> _mailboxIds; - private final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer; + protected final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer; public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, int senderStageId) { @@ -73,14 +73,6 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator { new BlockingMultiStreamConsumer.OfTransferableBlock(context.getId(), context.getDeadlineMs(), asyncStreams); } - protected BlockingMultiStreamConsumer.OfTransferableBlock getMultiConsumer() { - return _multiConsumer; - } - - public List<String> getMailboxIds() { - return _mailboxIds; - } - @Override protected void earlyTerminate() { _isEarlyTerminated = true; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index ad7913cdc1..584b49640f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -42,13 +42,13 @@ public class MailboxReceiveOperator extends BaseMailboxReceiveOperator { @Override protected TransferableBlock getNextBlock() { - TransferableBlock block = getMultiConsumer().readBlockBlocking(); + TransferableBlock block = _multiConsumer.readBlockBlocking(); // When early termination flag is set, caller is expecting an EOS block to be returned, however since the 2 stages // between sending/receiving mailbox are setting early termination flag asynchronously, there's chances that the // next block pulled out of the ReceivingMailbox to be an already buffered normal data block. This requires the // MailboxReceiveOperator to continue pulling and dropping data block until an EOS block is observed. while (_isEarlyTerminated && !block.isEndOfStreamBlock()) { - block = getMultiConsumer().readBlockBlocking(); + block = _multiConsumer.readBlockBlocking(); } return block; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java index 05804c12d3..8949ad569a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java @@ -52,7 +52,7 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator { private final boolean _isSortOnSender; private final List<Object[]> _rows = new ArrayList<>(); - private boolean _isSortedBlockConstructed; + private TransferableBlock _eosBlock; public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, DataSchema dataSchema, List<RexExpression> collationKeys, List<Direction> collationDirections, @@ -74,20 +74,25 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator { @Override protected TransferableBlock getNextBlock() { - while (true) { // loop in order to keep asking if we receive data blocks - TransferableBlock block = getMultiConsumer().readBlockBlocking(); + if (_eosBlock != null) { + return _eosBlock; + } + // Collect all the rows from the mailbox and sort them + while (true) { + TransferableBlock block = _multiConsumer.readBlockBlocking(); if (block.isDataBlock()) { _rows.addAll(block.getContainer()); } else if (block.isErrorBlock()) { return block; } else { assert block.isSuccessfulEndOfStreamBlock(); - - if (!_isSortedBlockConstructed && !_rows.isEmpty()) { + if (!_rows.isEmpty()) { + _eosBlock = block; + // TODO: This might not be efficient because we are sorting all the received rows. We should use a k-way merge + // when sender side is sorted. _rows.sort( new SortUtils.SortComparator(_collationKeys, _collationDirections, _collationNullDirections, _dataSchema, false)); - _isSortedBlockConstructed = true; return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW); } else { return block; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org