This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b0c360aae1 [multistage] Early terminate SortOperator if there is a limit (#11334) b0c360aae1 is described below commit b0c360aae186cce8940ce54fffe4532adc7fccfb Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Thu Aug 17 00:41:12 2023 -0700 [multistage] Early terminate SortOperator if there is a limit (#11334) --- .../src/test/resources/queries/JoinPlans.json | 19 ++++++++ .../pinot/query/mailbox/GrpcSendingMailbox.java | 46 +++++++++++-------- .../query/mailbox/InMemorySendingMailbox.java | 28 +++++++++++- .../pinot/query/mailbox/ReceivingMailbox.java | 28 +++++++----- .../apache/pinot/query/mailbox/SendingMailbox.java | 6 +++ .../mailbox/channel/MailboxContentObserver.java | 28 +++++++++--- .../query/runtime/blocks/TransferableBlock.java | 7 --- .../runtime/operator/MailboxSendOperator.java | 51 ++++++++++++---------- .../pinot/query/runtime/operator/SortOperator.java | 15 ++++--- .../runtime/operator/exchange/BlockExchange.java | 15 ++++++- .../pinot/query/mailbox/MailboxServiceTest.java | 16 ++++--- 11 files changed, 178 insertions(+), 81 deletions(-) diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json index 94a53aec08..cf8537c16c 100644 --- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json +++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json @@ -402,6 +402,25 @@ "\n LogicalTableScan(table=[[b]])", "\n" ] + }, + { + "description": "Inner join with limit", + "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 LIMIT 100", + "output": [ + "Execution Plan", + "\nLogicalSort(offset=[0], fetch=[100])", + "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])", + "\n LogicalSort(fetch=[100])", + "\n LogicalProject(col1=[$0], ts=[$1], col3=[$3])", + "\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col1=[$0], ts=[$6])", + "\n LogicalTableScan(table=[[a]])", + "\n PinotLogicalExchange(distribution=[hash[0]])", + "\n LogicalProject(col2=[$1], col3=[$2])", + "\n LogicalTableScan(table=[[b]])", + "\n" + ] } ] }, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java index 25cc337bf8..47b76b7e12 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.query.mailbox; -import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; import io.grpc.stub.StreamObserver; @@ -61,35 +60,47 @@ public class GrpcSendingMailbox implements SendingMailbox { @Override public void send(TransferableBlock block) throws IOException { + if (isTerminated()) { + return; + } if (_contentObserver == null) { _contentObserver = getContentObserver(); } - Preconditions.checkState(!_statusObserver.isFinished(), "Mailbox: %s is already closed", _id); _contentObserver.onNext(toMailboxContent(block)); } @Override public void complete() { + if (isTerminated()) { + return; + } _contentObserver.onCompleted(); } @Override public void cancel(Throwable t) { - if (!_statusObserver.isFinished()) { - LOGGER.debug("Cancelling mailbox: {}", _id); - if (_contentObserver == null) { - _contentObserver = getContentObserver(); - } - try { - // NOTE: DO NOT use onError() because it will terminate the stream, and receiver might not get the callback - _contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock( - new RuntimeException("Cancelled by sender with exception: " + t.getMessage(), t)))); - _contentObserver.onCompleted(); - } catch (Exception e) { - // Exception can be thrown if the stream is already closed, so we simply ignore it - LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e); - } + if (isTerminated()) { + return; + } + LOGGER.debug("Cancelling mailbox: {}", _id); + if (_contentObserver == null) { + _contentObserver = getContentObserver(); } + try { + // NOTE: DO NOT use onError() because it will terminate the stream, and receiver might not get the callback + _contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock( + new RuntimeException("Cancelled by sender with exception: " + t.getMessage(), t)))); + _contentObserver.onCompleted(); + } catch (Exception e) { + // Exception can be thrown if the stream is already closed, so we simply ignore it + LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e); + } + } + + @Override + public boolean isTerminated() { + // TODO: We cannot differentiate early termination vs stream error + return _statusObserver.isFinished(); } private StreamObserver<MailboxContent> getContentObserver() { @@ -102,7 +113,6 @@ public class GrpcSendingMailbox implements SendingMailbox { DataBlock dataBlock = block.getDataBlock(); byte[] bytes = dataBlock.toBytes(); ByteString byteString = UnsafeByteOperations.unsafeWrap(bytes); - return MailboxContent.newBuilder().setMailboxId(_id).setPayload(byteString) - .build(); + return MailboxContent.newBuilder().setMailboxId(_id).setPayload(byteString).build(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java index fb96d62043..23943a2091 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java @@ -32,6 +32,7 @@ public class InMemorySendingMailbox implements SendingMailbox { private final long _deadlineMs; private ReceivingMailbox _receivingMailbox; + private volatile boolean _isTerminated; public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) { _id = id; @@ -41,12 +42,27 @@ public class InMemorySendingMailbox implements SendingMailbox { @Override public void send(TransferableBlock block) { + if (_isTerminated) { + return; + } if (_receivingMailbox == null) { _receivingMailbox = _mailboxService.getReceivingMailbox(_id); } long timeoutMs = _deadlineMs - System.currentTimeMillis(); - if (!_receivingMailbox.offer(block, timeoutMs)) { - throw new RuntimeException(String.format("Failed to offer block into mailbox: %s within: %dms", _id, timeoutMs)); + ReceivingMailbox.ReceivingMailboxStatus status = _receivingMailbox.offer(block, timeoutMs); + switch (status) { + case SUCCESS: + break; + case ERROR: + throw new RuntimeException(String.format("Mailbox: %s already errored out (received error block before)", _id)); + case TIMEOUT: + throw new RuntimeException( + String.format("Timed out adding block into mailbox: %s with timeout: %dms", _id, timeoutMs)); + case EARLY_TERMINATED: + _isTerminated = true; + break; + default: + throw new IllegalStateException("Unsupported mailbox status: " + status); } } @@ -56,6 +72,9 @@ public class InMemorySendingMailbox implements SendingMailbox { @Override public void cancel(Throwable t) { + if (_isTerminated) { + return; + } LOGGER.debug("Cancelling mailbox: {}", _id); if (_receivingMailbox == null) { _receivingMailbox = _mailboxService.getReceivingMailbox(_id); @@ -63,4 +82,9 @@ public class InMemorySendingMailbox implements SendingMailbox { _receivingMailbox.setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock( new RuntimeException("Cancelled by sender with exception: " + t.getMessage(), t))); } + + @Override + public boolean isTerminated() { + return _isTerminated; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java index fcba7c0a3d..64b7c3f202 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java @@ -65,37 +65,41 @@ public class ReceivingMailbox { * Offers a non-error block into the mailbox within the timeout specified, returns whether the block is successfully * added. If the block is not added, an error block is added to the mailbox. */ - public boolean offer(TransferableBlock block, long timeoutMs) { - if (_errorBlock.get() != null) { + public ReceivingMailboxStatus offer(TransferableBlock block, long timeoutMs) { + TransferableBlock errorBlock = _errorBlock.get(); + if (errorBlock != null) { LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id); - return false; + return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED + : ReceivingMailboxStatus.ERROR; } - if (timeoutMs < 0) { + if (timeoutMs <= 0) { LOGGER.debug("Mailbox: {} is already timed out", _id); setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock( new TimeoutException("Timed out while offering data to mailbox: " + _id))); - return false; + return ReceivingMailboxStatus.TIMEOUT; } try { if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) { - if (_errorBlock.get() == null) { + errorBlock = _errorBlock.get(); + if (errorBlock == null) { _receiveMailCallback.accept(MailboxIdUtils.toOpChainId(_id)); - return true; + return ReceivingMailboxStatus.SUCCESS; } else { LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id); _blocks.clear(); - return false; + return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED + : ReceivingMailboxStatus.ERROR; } } else { LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", _id, timeoutMs); setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock( new TimeoutException("Timed out while waiting for receive operator to consume data from mailbox: " + _id))); - return false; + return ReceivingMailboxStatus.TIMEOUT; } } catch (InterruptedException e) { LOGGER.error("Interrupted while offering block into mailbox: {}", _id); setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(e)); - return false; + return ReceivingMailboxStatus.ERROR; } } @@ -133,4 +137,8 @@ public class ReceivingMailbox { public int getNumPendingBlocks() { return _blocks.size(); } + + public enum ReceivingMailboxStatus { + SUCCESS, ERROR, TIMEOUT, EARLY_TERMINATED + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java index 3a794260b7..68b4f958cd 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java @@ -55,4 +55,10 @@ public interface SendingMailbox { * No more blocks can be sent after calling this method. */ void cancel(Throwable t); + + /** + * Returns whether the {@link ReceivingMailbox} is already closed. There is no need to send more blocks after the + * mailbox is terminated. + */ + boolean isTerminated(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java index d9509d5766..9074d81151 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java @@ -77,13 +77,27 @@ public class MailboxContentObserver implements StreamObserver<MailboxContent> { } long timeoutMs = Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS); - if (_mailbox.offer(block, timeoutMs)) { - _responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId) - .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, - Integer.toString(_mailbox.getNumPendingBlocks())).build()); - } else { - LOGGER.warn("Failed to add block into mailbox: {} within timeout: {}ms", mailboxId, timeoutMs); - cancelStream(); + ReceivingMailbox.ReceivingMailboxStatus status = _mailbox.offer(block, timeoutMs); + switch (status) { + case SUCCESS: + _responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId) + .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, + Integer.toString(_mailbox.getNumPendingBlocks())).build()); + break; + case ERROR: + LOGGER.warn("Mailbox: {} already errored out (received error block before)", mailboxId); + cancelStream(); + break; + case TIMEOUT: + LOGGER.warn("Timed out adding block into mailbox: {} with timeout: {}ms", mailboxId, timeoutMs); + cancelStream(); + break; + case EARLY_TERMINATED: + LOGGER.debug("Mailbox: {} has been early terminated", mailboxId); + onCompleted(); + break; + default: + throw new IllegalStateException("Unsupported mailbox status: " + status); } } catch (Exception e) { String errorMessage = "Caught exception while processing blocks for mailbox: " + mailboxId; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java index 64e4dc31ae..3465f94602 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.query.runtime.blocks; -import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,12 +46,6 @@ public class TransferableBlock implements Block { private List<Object[]> _container; public TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type containerType) { - this(container, dataSchema, containerType, false); - } - - @VisibleForTesting - TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type containerType, - boolean isErrorBlock) { _container = container; _dataSchema = dataSchema; _type = containerType; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index c5e1c71f5f..78bc02351f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -40,6 +40,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.spi.exception.EarlyTerminationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,37 +119,40 @@ public class MailboxSendOperator extends MultiStageOperator { @Override protected TransferableBlock getNextBlock() { - boolean canContinue = true; - TransferableBlock transferableBlock; try { - transferableBlock = _sourceOperator.nextBlock(); - if (transferableBlock.isNoOpBlock()) { - return transferableBlock; - } else if (transferableBlock.isEndOfStreamBlock()) { - if (transferableBlock.isSuccessfulEndOfStreamBlock()) { - // Stats need to be populated here because the block is being sent to the mailbox - // and the receiving opChain will not be able to access the stats from the previous opChain - TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( - OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); - sendTransferableBlock(eosBlockWithStats); - } else { - sendTransferableBlock(transferableBlock); - } - } else { // normal blocks - // check whether we should continue depending on exchange queue condition. - canContinue = sendTransferableBlock(transferableBlock); + TransferableBlock block = _sourceOperator.nextBlock(); + if (block.isNoOpBlock()) { + return block; + } else if (block.isErrorBlock()) { + sendTransferableBlock(block); + return block; + } else if (block.isSuccessfulEndOfStreamBlock()) { + // Stats need to be populated here because the block is being sent to the mailbox + // and the receiving opChain will not be able to access the stats from the previous opChain + TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( + OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); + sendTransferableBlock(eosBlockWithStats); + return block; + } else { + // Data block + boolean canContinue = sendTransferableBlock(block); + // Yield if we cannot continue to put transferable block into the sending queue + return canContinue ? block : TransferableBlockUtils.getNoOpTransferableBlock(); } + } catch (EarlyTerminationException e) { + // TODO: Query stats are not sent when opChain is early terminated + LOGGER.debug("Early terminating opChain: " + _context.getId()); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } catch (Exception e) { - transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e); + TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e); try { LOGGER.error("Exception while transferring data on opChain: " + _context.getId(), e); - sendTransferableBlock(transferableBlock); + sendTransferableBlock(errorBlock); } catch (Exception e2) { LOGGER.error("Exception while sending error block.", e2); } + return errorBlock; } - // yield if we cannot continue to put transferable block into the sending queue - return canContinue ? transferableBlock : TransferableBlockUtils.getNoOpTransferableBlock(); } private boolean sendTransferableBlock(TransferableBlock block) @@ -157,7 +161,8 @@ public class MailboxSendOperator extends MultiStageOperator { if (_exchange.offerBlock(block, timeoutMs)) { return _exchange.getRemainingCapacity() > 0; } else { - throw new TimeoutException("Timeout while offering data block into the sending queue."); + throw new TimeoutException( + String.format("Timed out while offering block into the sending queue after %dms", timeoutMs)); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java index 0f3fef5208..7e0b6f8e4c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java @@ -84,7 +84,7 @@ public class SortOperator extends MultiStageOperator { // - 'isInputSorted' is set to true indicating that the data was already sorted if (collationKeys.isEmpty() || isInputSorted) { _priorityQueue = null; - _rows = new ArrayList<>(); + _rows = new ArrayList<>(Math.min(defaultHolderCapacity, _numRowsToKeep)); } else { // Use the opposite direction as specified by the collation directions since we need the PriorityQueue to decide // which elements to keep and which to remove based on the limits. @@ -160,7 +160,7 @@ public class SortOperator extends MultiStageOperator { if (block.isErrorBlock()) { _upstreamErrorBlock = block; return; - } else if (TransferableBlockUtils.isEndOfStream(block)) { + } else if (block.isSuccessfulEndOfStreamBlock()) { _readyToConstruct = true; return; } @@ -168,11 +168,16 @@ public class SortOperator extends MultiStageOperator { List<Object[]> container = block.getContainer(); if (_priorityQueue == null) { // TODO: when push-down properly, we shouldn't get more than _numRowsToKeep - if (_rows.size() <= _numRowsToKeep) { - if (_rows.size() + container.size() <= _numRowsToKeep) { + int numRows = _rows.size(); + if (numRows < _numRowsToKeep) { + if (numRows + container.size() < _numRowsToKeep) { _rows.addAll(container); } else { - _rows.addAll(container.subList(0, _numRowsToKeep - _rows.size())); + _rows.addAll(container.subList(0, _numRowsToKeep - numRows)); + LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId, + _context.getId()); + _readyToConstruct = true; + return; } } } else { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java index b7a152df02..e47d9835fa 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java @@ -35,6 +35,7 @@ import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.OpChainId; +import org.apache.pinot.spi.exception.EarlyTerminationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,6 +95,16 @@ public abstract class BlockExchange { public boolean offerBlock(TransferableBlock block, long timeoutMs) throws Exception { + boolean isEarlyTerminated = true; + for (SendingMailbox sendingMailbox : _sendingMailboxes) { + if (!sendingMailbox.isTerminated()) { + isEarlyTerminated = false; + break; + } + } + if (isEarlyTerminated) { + throw new EarlyTerminationException(); + } return _queue.offer(block, timeoutMs, TimeUnit.MILLISECONDS); } @@ -111,8 +122,8 @@ public abstract class BlockExchange { } block = _queue.poll(timeoutMs, TimeUnit.MILLISECONDS); if (block == null) { - block = TransferableBlockUtils.getErrorTransferableBlock( - new TimeoutException("Timed out on exchange for opChain: " + _opChainId)); + block = TransferableBlockUtils.getErrorTransferableBlock(new TimeoutException( + String.format("Timed out polling block for opChain: %s after %dms", _opChainId, timeoutMs))); } else { // Notify that the block exchange can now accept more blocks. _callback.accept(_opChainId); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java index 0b77996c22..9e372b91b9 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java @@ -506,13 +506,15 @@ public class MailboxServiceTest { Thread.sleep(deadlineMs - System.currentTimeMillis() + 10); receiveMailLatch.await(); assertEquals(numCallbacks.get(), 2); - try { - sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{1})); - fail("Expect exception when sending data after timing out"); - } catch (Exception e) { - // Expected - } - assertEquals(numCallbacks.get(), 2); + // TODO: Currently we cannot differentiate early termination vs stream error + assertTrue(sendingMailbox.isTerminated()); +// try { +// sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{1})); +// fail("Expect exception when sending data after timing out"); +// } catch (Exception e) { +// // Expected +// } +// assertEquals(numCallbacks.get(), 2); // Data blocks will be cleaned up ReceivingMailbox receivingMailbox = _mailboxService1.getReceivingMailbox(mailboxId); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org