This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new de823db96fe Rewrite ReceivingMailbox to be able to unblock writers on
cancellation or error (#16903)
de823db96fe is described below
commit de823db96fec884a290e54dec0ab8b2ab65c1d5a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 13 14:43:43 2025 +0200
Rewrite ReceivingMailbox to be able to unblock writers on cancellation or
error (#16903)
---
.../pinot/query/mailbox/GrpcSendingMailbox.java | 14 +-
.../query/mailbox/InMemorySendingMailbox.java | 49 +-
.../apache/pinot/query/mailbox/MailboxService.java | 5 +-
.../pinot/query/mailbox/ReceivingMailbox.java | 602 ++++++++++++++++-----
.../apache/pinot/query/mailbox/SendingMailbox.java | 8 +-
.../mailbox/channel/MailboxContentObserver.java | 66 ++-
.../apache/pinot/query/runtime/QueryRunner.java | 24 +-
.../operator/BaseMailboxReceiveOperator.java | 4 +-
.../runtime/operator/MailboxSendOperator.java | 31 +-
.../query/runtime/operator/MultiStageOperator.java | 9 +
.../runtime/operator/exchange/BlockExchange.java | 45 +-
.../operator/exchange/BroadcastExchange.java | 5 +-
.../runtime/operator/exchange/HashExchange.java | 5 +-
.../runtime/operator/exchange/RandomExchange.java | 5 +-
.../operator/exchange/SingletonExchange.java | 5 +-
.../utils/BlockingMultiStreamConsumer.java | 6 +-
.../pinot/query/mailbox/MailboxServiceTest.java | 42 +-
.../pinot/query/mailbox/ReceivingMailboxTest.java | 333 ++++++++++++
.../runtime/operator/MailboxSendOperatorTest.java | 21 -
.../operator/exchange/BlockExchangeTest.java | 5 +-
.../runtime/queries/ResourceBasedQueriesTest.java | 31 +-
.../pinot/tools/ColocatedJoinQuickStart.java | 2 +
22 files changed, 999 insertions(+), 318 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 4ffdd58e684..50c4b1003eb 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -32,7 +32,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
@@ -50,6 +49,7 @@ import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.segment.spi.memory.DataBuffer;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,21 +89,18 @@ public class GrpcSendingMailbox implements SendingMailbox {
}
@Override
- public void send(MseBlock.Data data)
- throws IOException, TimeoutException {
+ public void send(MseBlock.Data data) {
sendInternal(data, List.of());
}
@Override
- public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
- throws IOException, TimeoutException {
+ public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
sendInternal(block, serializedStats);
LOGGER.debug("Completing mailbox: {}", _id);
_contentObserver.onCompleted();
}
- private void sendInternal(MseBlock block, List<DataBuffer> serializedStats)
- throws IOException {
+ private void sendInternal(MseBlock block, List<DataBuffer> serializedStats) {
if (isTerminated() || (isEarlyTerminated() && block.isData())) {
LOGGER.debug("==[GRPC SEND]== terminated or early terminated mailbox.
Skipping sending message {} to: {}",
block, _id);
@@ -118,8 +115,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
try {
processAndSend(block, serializedStats);
} catch (IOException e) {
- LOGGER.warn("Failed to split and send mailbox", e);
- throw e;
+ throw new QueryException(QueryErrorCode.INTERNAL, "Failed to split and
send mailbox", e);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[GRPC SEND]== message " + block + " sent to: " + _id);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index b566100f276..4b0166f5016 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -18,7 +18,8 @@
*/
package org.apache.pinot.query.mailbox;
-import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
@@ -42,7 +43,13 @@ public class InMemorySendingMailbox implements
SendingMailbox {
private final long _deadlineMs;
private ReceivingMailbox _receivingMailbox;
+
+ /// Set to true when the send operation completes calling [#complete()]
private volatile boolean _isTerminated;
+
+ /// Set to true when the receiver waits for EOS but discards any further
data blocks.
+ /// This can happen when the receiver has already early terminated, for
example,
+ /// when the [org.apache.pinot.query.runtime.operator.SortOperator] limit
has been reached.
private volatile boolean _isEarlyTerminated;
private final StatMap<MailboxSendOperator.StatKey> _statMap;
@@ -60,21 +67,19 @@ public class InMemorySendingMailbox implements
SendingMailbox {
}
@Override
- public void send(MseBlock.Data data)
- throws IOException, TimeoutException {
+ public void send(MseBlock.Data data) {
sendPrivate(data, Collections.emptyList());
}
@Override
- public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
- throws IOException, TimeoutException {
+ public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
sendPrivate(block, serializedStats);
_isTerminated = true;
}
- private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
- throws TimeoutException {
+ private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats) {
if (isTerminated() || (isEarlyTerminated() && block.isData())) {
+ LOGGER.debug("Mailbox {} already terminated, ignoring block {}", _id,
block);
return;
}
if (_receivingMailbox == null) {
@@ -82,22 +87,30 @@ public class InMemorySendingMailbox implements
SendingMailbox {
}
_statMap.merge(MailboxSendOperator.StatKey.IN_MEMORY_MESSAGES, 1);
long timeoutMs = _deadlineMs - System.currentTimeMillis();
- ReceivingMailbox.ReceivingMailboxStatus status =
_receivingMailbox.offer(block, serializedStats, timeoutMs);
-
+ ReceivingMailbox.ReceivingMailboxStatus status;
+ try {
+ status = _receivingMailbox.offer(block, serializedStats, timeoutMs);
+ } catch (InterruptedException e) {
+ // We are not restoring the interrupt status because we are already
throwing an exception
+ // Code that catches this exception must finish the work fast enough to
comply the interrupt contract
+ // See https://github.com/apache/pinot/pull/16903#discussion_r2409003423
+ throw new QueryException(QueryErrorCode.INTERNAL, "Interrupted while
sending data to mailbox: " + _id, e);
+ } catch (TimeoutException e) {
+ throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT, "Timed out
adding block into mailbox: " + _id
+ + " with timeout: " + Duration.of(timeoutMs, ChronoUnit.MILLIS), e);
+ }
switch (status) {
case SUCCESS:
break;
- case CANCELLED:
- throw new QueryCancelledException(String.format("Mailbox: %s already
cancelled from upstream", _id));
- case ERROR:
- throw new QueryException(QueryErrorCode.INTERNAL, String.format(
- "Mailbox: %s already errored out (received error block before)",
_id));
- case TIMEOUT:
- throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
- String.format("Timed out adding block into mailbox: %s with
timeout: %dms", _id, timeoutMs));
- case EARLY_TERMINATED:
+ case WAITING_EOS:
_isEarlyTerminated = true;
break;
+ case LAST_BLOCK:
+ _isTerminated = true;
+ break;
+ case ALREADY_TERMINATED:
+ LOGGER.error("Trying to offer blocks to the already closed mailbox {}.
This should not happen", _id);
+ break;
default:
throw new IllegalStateException("Unsupported mailbox status: " +
status);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 9ed2416bfd7..3a58293367e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -55,11 +55,14 @@ public class MailboxService {
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS,
TimeUnit.SECONDS)
.removalListener((RemovalListener<String, ReceivingMailbox>)
notification -> {
if (notification.wasEvicted()) {
- int numPendingBlocks =
notification.getValue().getNumPendingBlocks();
+ ReceivingMailbox receivingMailbox = notification.getValue();
+ int numPendingBlocks = receivingMailbox.getNumPendingBlocks();
if (numPendingBlocks > 0) {
LOGGER.warn("Evicting dangling receiving mailbox: {} with {}
pending blocks", notification.getKey(),
numPendingBlocks);
}
+ // In case there is a leak, we should cancel the mailbox to
unblock any waiters and release resources.
+ receivingMailbox.cancel();
}
}).build();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index a98721037c8..9bef391d21c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -22,15 +22,15 @@ import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.ThreadSafe;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
@@ -44,65 +44,58 @@ import org.apache.pinot.spi.exception.QueryErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-/**
- * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is
with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the send operator.
This is because the ReceivingMailbox can be
- * initialized even before the corresponding OpChain is registered on the
receiver, whereas the SendingMailbox is
- * initialized when the send operator is running.
- *
- * There is a single ReceivingMailbox for each {@link
org.apache.pinot.query.runtime.operator.MailboxReceiveOperator}.
- * The offer methods will be called when new blocks are received from
different sources. For example local workers will
- * directly call {@link #offer(MseBlock, List, long)} while each remote worker
opens a GPRC channel where messages
- * are sent in raw format and {@link #offerRaw(List, long)} is called from
them.
- */
+/// Mailbox that's used to receive data. Ownership of the ReceivingMailbox is
with the MailboxService, which is unlike
+/// the [SendingMailbox] whose ownership lies with the send operator. This is
because the ReceivingMailbox can be
+/// initialized even before the corresponding OpChain is registered on the
receiver, whereas the SendingMailbox is
+/// initialized when the send operator is running.
+///
+/// There is a single ReceivingMailbox for each pair of (sender, receiver)
opchains. This means that each receive
+/// operator will have multiple ReceivingMailbox instances, one for each
sender. They are coordinated by a
+/// [BlockingMultiStreamConsumer].
+///
+/// A ReceivingMailbox can have at most one reader and one writer at any given
time. This means that different threads
+/// writing to the same mailbox must be externally synchronized.
+///
+/// The offer methods will be called when new blocks are received from
different sources. For example local workers will
+/// directly call [#offer(MseBlock, List, long)] while each remote worker
opens a GPRC channel where messages
+/// are sent in raw format and [#offerRaw(List, long)] is called from them.
@ThreadSafe
public class ReceivingMailbox {
public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
private static final Logger LOGGER =
LoggerFactory.getLogger(ReceivingMailbox.class);
- // This was previously a static final attribute, but now that includes
server and stage, we cannot use constants
- private volatile MseBlockWithStats _cancelledErrorBlock;
private final String _id;
// TODO: Make the queue size configurable
- // TODO: Revisit if this is the correct way to apply back pressure
+ // TODO: Apply backpressure at the sender side when the queue is full.
/// The queue where blocks are going to be stored.
- private final BlockingQueue<MseBlockWithStats> _blocks = new
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
- private final AtomicReference<MseBlockWithStats> _errorBlock = new
AtomicReference<>();
- private volatile boolean _isEarlyTerminated = false;
+ private final CancellableBlockingQueue _blocks;
private long _lastArriveTime = System.currentTimeMillis();
- @Nullable
- private volatile Reader _reader;
private final StatMap<StatKey> _stats = new StatMap<>(StatKey.class);
- public ReceivingMailbox(String id) {
+ public ReceivingMailbox(String id, int maxPendingBlocks) {
_id = id;
+ _blocks = new CancellableBlockingQueue(id, maxPendingBlocks);
+ }
+
+ public ReceivingMailbox(String id) {
+ this(id, DEFAULT_MAX_PENDING_BLOCKS);
}
public void registeredReader(Reader reader) {
- if (_reader != null) {
- throw new IllegalArgumentException("Only one reader is supported");
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("==[MAILBOX]== Reader registered for mailbox: " + _id);
- }
- _reader = reader;
+ _blocks.registerReader(reader);
}
public String getId() {
return _id;
}
- /**
- * Offers a raw block into the mailbox within the timeout specified, returns
whether the block is successfully added.
- * If the block is not added, an error block is added to the mailbox.
- * <p>
- * Contrary to {@link #offer(MseBlock, List, long)}, the block may be an
error block.
- */
+ ///Offers a raw block into the mailbox within the timeout specified, returns
whether the block is successfully added.
+ ///
+ ///Contrary to [#offer(MseBlock, List, long)], the block may be an error
block.
public ReceivingMailboxStatus offerRaw(List<ByteBuffer> byteBuffers, long
timeoutMs)
- throws IOException {
+ throws IOException, InterruptedException, TimeoutException {
updateWaitCpuTime();
long startTimeMs = System.currentTimeMillis();
@@ -123,11 +116,8 @@ public class ReceivingMailbox {
} else {
MetadataBlock metadataBlock = (MetadataBlock) dataBlock;
Map<QueryErrorCode, String> exceptionsByQueryError =
QueryErrorCode.fromKeyMap(exceptions);
- ErrorMseBlock errorBlock =
- new ErrorMseBlock(metadataBlock.getStageId(),
metadataBlock.getWorkerId(), metadataBlock.getServerId(),
+ block = new ErrorMseBlock(metadataBlock.getStageId(),
metadataBlock.getWorkerId(), metadataBlock.getServerId(),
exceptionsByQueryError);
- setErrorBlock(errorBlock, dataBlock.getStatsByStage());
- return ReceivingMailboxStatus.FIRST_ERROR;
}
} else {
block = new SerializedDataBlock(dataBlock);
@@ -135,62 +125,49 @@ public class ReceivingMailbox {
return offerPrivate(block, dataBlock.getStatsByStage(), timeoutMs);
}
- public ReceivingMailboxStatus offer(MseBlock block, List<DataBuffer>
serializedStats, long timeoutMs) {
+ /// Offers a non-error block into the mailbox within the timeout specified,
returns whether the block is successfully
+ /// added.
+ public ReceivingMailboxStatus offer(MseBlock block, List<DataBuffer>
serializedStats, long timeoutMs)
+ throws InterruptedException, TimeoutException {
updateWaitCpuTime();
_stats.merge(StatKey.IN_MEMORY_MESSAGES, 1);
- if (block instanceof ErrorMseBlock) {
- setErrorBlock((ErrorMseBlock) block, serializedStats);
- return ReceivingMailboxStatus.EARLY_TERMINATED;
- }
return offerPrivate(block, serializedStats, timeoutMs);
}
- /**
- * Offers a non-error block into the mailbox within the timeout specified,
returns whether the block is successfully
- * added. If the block is not added, an error block is added to the mailbox.
- */
- private ReceivingMailboxStatus offerPrivate(MseBlock block, List<DataBuffer>
stats, long timeoutMs) {
- MseBlockWithStats errorBlock = _errorBlock.get();
- if (errorBlock != null) {
- LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring
the late block", _id);
- return errorBlock == _cancelledErrorBlock ?
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
- }
- if (timeoutMs <= 0) {
- LOGGER.debug("Mailbox: {} is already timed out", _id);
- setErrorBlock(
- ErrorMseBlock.fromException(new TimeoutException("Timed out while
offering data to mailbox: " + _id)),
- stats);
- return ReceivingMailboxStatus.TIMEOUT;
- }
+ /// Offers a non-error block into the mailbox within the timeout specified,
returns whether the block is successfully
+ /// added.
+ private ReceivingMailboxStatus offerPrivate(MseBlock block, List<DataBuffer>
stats, long timeoutMs)
+ throws InterruptedException, TimeoutException {
+ long start = System.currentTimeMillis();
try {
- long now = System.currentTimeMillis();
- MseBlockWithStats blockWithStats = new MseBlockWithStats(block, stats);
- boolean accepted = _blocks.offer(blockWithStats, timeoutMs,
TimeUnit.MILLISECONDS);
- _stats.merge(StatKey.OFFER_CPU_TIME_MS, System.currentTimeMillis() -
now);
- if (accepted) {
- errorBlock = _errorBlock.get();
- if (errorBlock == null) {
+ ReceivingMailboxStatus result;
+ if (block.isEos()) {
+ result = _blocks.offerEos((MseBlock.Eos) block, stats);
+ } else {
+ result = _blocks.offerData((MseBlock.Data) block, timeoutMs,
TimeUnit.MILLISECONDS);
+ }
+
+ switch (result) {
+ case SUCCESS:
+ case LAST_BLOCK:
+ _stats.merge(StatKey.OFFER_CPU_TIME_MS, System.currentTimeMillis() -
start);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[MAILBOX]== Block " + block + " ready to read from
mailbox: " + _id);
}
- notifyReader();
- return _isEarlyTerminated ? ReceivingMailboxStatus.EARLY_TERMINATED
: ReceivingMailboxStatus.SUCCESS;
- } else {
- LOGGER.debug("Mailbox: {} is already cancelled or errored out,
ignoring the late block", _id);
- _blocks.clear();
- return errorBlock == _cancelledErrorBlock ?
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
- }
- } else {
- LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms",
_id, timeoutMs);
- TimeoutException exception = new TimeoutException(
- "Timed out while waiting for receive operator to consume data from
mailbox: " + _id);
- setErrorBlock(ErrorMseBlock.fromException(exception), stats);
- return ReceivingMailboxStatus.TIMEOUT;
+ break;
+ case WAITING_EOS:
+ case ALREADY_TERMINATED:
+ default:
+ // Nothing to do
}
+ return result;
+ } catch (TimeoutException e) {
+ _stats.merge(StatKey.OFFER_CPU_TIME_MS, System.currentTimeMillis() -
start);
+ throw e;
} catch (InterruptedException e) {
- LOGGER.error("Interrupted while offering block into mailbox: {}", _id);
- setErrorBlock(ErrorMseBlock.fromException(e), stats);
- return ReceivingMailboxStatus.ERROR;
+ String errorMessage = "Interrupted on mailbox " + _id + " while offering
blocks";
+ setErrorBlock(ErrorMseBlock.fromError(QueryErrorCode.INTERNAL,
errorMessage), stats);
+ throw e;
}
}
@@ -200,63 +177,34 @@ public class ReceivingMailbox {
_lastArriveTime = now;
}
- /**
- * Sets an error block into the mailbox. No more blocks are accepted after
calling this method.
- */
+ /// Sets an error block into the mailbox. No more blocks are accepted after
calling this method.
public void setErrorBlock(ErrorMseBlock errorBlock, List<DataBuffer>
serializedStats) {
- if (_errorBlock.compareAndSet(null, new MseBlockWithStats(errorBlock,
serializedStats))) {
- _blocks.clear();
- notifyReader();
- }
+ _blocks.offerEos(errorBlock, serializedStats);
}
- /**
- * Returns the first block from the mailbox, or {@code null} if there is no
block received yet. Error block is
- * returned if exists.
- */
+ /// Returns the first block from the mailbox, or {@code null} if there is no
block received yet.
@Nullable
public MseBlockWithStats poll() {
- Preconditions.checkState(_reader != null, "A reader must be registered");
- MseBlockWithStats errorBlock = _errorBlock.get();
- return errorBlock != null ? errorBlock : _blocks.poll();
+ return _blocks.poll();
}
- /**
- * Early terminate the mailbox, called when upstream doesn't expect any more
data block.
- */
+ /// Early terminate the mailbox, called when upstream doesn't expect any
more *data* block.
public void earlyTerminate() {
- _isEarlyTerminated = true;
+ _blocks.earlyTerminate();
}
- /**
- * Cancels the mailbox. No more blocks are accepted after calling this
method. Should only be called by the receive
- * operator to clean up the remaining blocks.
- */
+ /// Cancels the mailbox. No more blocks are accepted after calling this
method and [#poll] will always return
+ /// an error block.
public void cancel() {
LOGGER.debug("Cancelling mailbox: {}", _id);
- if (_errorBlock.get() == null) {
- MseBlockWithStats errorBlock = new MseBlockWithStats(
- ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, "Cancelled
by receiver"),
- Collections.emptyList());
- if (_errorBlock.compareAndSet(null, errorBlock)) {
- _cancelledErrorBlock = errorBlock;
- _blocks.clear();
- }
- }
+ _blocks.offerEos(ErrorMseBlock.fromException(null), List.of());
}
+ /// Returns the number of pending **data** blocks in the mailbox.
+ ///
+ /// Remember that the EOS block is not counted here.
public int getNumPendingBlocks() {
- return _blocks.size();
- }
-
- private void notifyReader() {
- Reader reader = _reader;
- if (reader != null) {
- LOGGER.debug("Notifying reader");
- reader.blockReadyToRead();
- } else {
- LOGGER.debug("No reader to notify");
- }
+ return _blocks.exactSize();
}
public StatMap<StatKey> getStatMap() {
@@ -268,7 +216,26 @@ public class ReceivingMailbox {
}
public enum ReceivingMailboxStatus {
- SUCCESS, FIRST_ERROR, ERROR, TIMEOUT, CANCELLED, EARLY_TERMINATED
+ /// The block was successfully added to the mailbox.
+ ///
+ /// More blocks can be sent.
+ SUCCESS,
+ /// The block is rejected because downstream has early terminated and now
is only waiting for EOS in order to
+ /// get the stats.
+ ///
+ /// More blocks can be sent, but data blocks will be rejected.
+ WAITING_EOS,
+ /// The received message is the last block the mailbox will ever read.
+ ///
+ /// This happens for example when an EOS block is added to the mailbox.
+ ///
+ /// No more blocks can be sent.
+ LAST_BLOCK,
+ /// The mailbox has been closed for write. There may still be pending
blocks to read, but no more blocks
+ /// can be added.
+ ///
+ /// No more blocks can be sent.
+ ALREADY_TERMINATED
}
public enum StatKey implements StatMap.Key {
@@ -315,4 +282,377 @@ public class ReceivingMailbox {
return _serializedStats;
}
}
+
+ /// The state of the queue.
+ ///
+ /// ```
+ /// +-------------------+ offerEos +-------------------+
+ /// | FULL_OPEN | -----------> | UPSTREAM_FINISHED|
+ /// +-------------------+ +-------------------+
+ /// | |
+ /// | earlyTerminate | poll -- when all pending data
is read
+ /// v v
+ /// +-------------------+ offerEos +-------------------+
+ /// | WAITING_EOS | -----------> | FULL_CLOSED |
+ /// +-------------------+ +-------------------+
+ /// ```
+ private enum State {
+ /// The queue is open for both read and write.
+ ///
+ /// - [#poll()] returns the pending blocks in the queue, or null if the
queue is empty.
+ /// - [#offer] accepts both data and EOS blocks.
+ ///
+ /// Transitions to [State#UPSTREAM_FINISHED] when an EOS block is offered
or to [State#WAITING_EOS] when
+ /// [#earlyTerminate()] is called.
+ FULL_OPEN,
+ /// The downstream is not interested in reading more data but is waiting
for an EOS block to get the stats.
+ ///
+ /// - [#poll()] returns null.
+ /// - [#offer] rejects all data blocks.
+ ///
+ /// Transitions to [State#FULL_CLOSED] when an EOS block is offered.
+ WAITING_EOS,
+ /// The upstream has indicated that no more data will be sent.
+ ///
+ /// - [#poll()] returns the pending blocks in the queue and then the EOS
block.
+ /// - [#offer] rejects all blocks.
+ ///
+ /// Transitions to [State#FULL_CLOSED] when the EOS block is read by
[#poll()].
+ UPSTREAM_FINISHED,
+ /// The queue is closed for both read and write.
+ ///
+ /// - [#poll()] always returns the EOS block, which is always not null.
+ /// - [#offer] rejects all blocks.
+ ///
+ /// No transitions out of this state.
+ FULL_CLOSED
+ }
+
+ /// This is a special bounded blocking queue implementation similar to
ArrayBlockingQueue, but:
+ /// - Only accepts a single reader (aka downstream).
+ /// - Only accepts a multiple concurrent writers (aka upstream)
+ /// - Can be [closed for write][#closeForWrite(MseBlock.Eos, List)].
+ /// - Can be [#earlyTerminate()]d.
+ ///
+ /// Read the [State] enum to understand the different states and their
transitions.
+ ///
+ /// All methods of this class are thread-safe and may block, although only
[#offer] should block for a long time.
+ @ThreadSafe
+ private static class CancellableBlockingQueue {
+ private final String _id;
+ @Nullable
+ private volatile Reader _reader;
+ /// This is set when the queue is in [State#FULL_CLOSED] or
[State#UPSTREAM_FINISHED].
+ @Nullable
+ @GuardedBy("_lock")
+ private MseBlockWithStats _eos;
+ /// The current state of the queue.
+ ///
+ /// All changes to this field must be done by calling [#changeState(State,
String)] in order to log the state
+ /// transitions.
+ @GuardedBy("_lock")
+ private State _state = State.FULL_OPEN;
+ /// The items in the queue.
+ ///
+ /// This is a circular array where [#_putIndex] is the index to add the
next item and [#_takeIndex] is the index to
+ /// take the next item from. Only data blocks are stored in this array,
the EOS block is stored in [#_eos].
+ ///
+ /// Like in normal blocking queues, elements are added when upstream
threads call [#offer] and removed when the
+ /// downstream thread calls [#poll]. Unlike normal blocking queues,
elements will be [removed][#drainDataBlocks()]
+ /// when transitioning to [State#WAITING_EOS] or [State#FULL_CLOSED].
+ @GuardedBy("_lock")
+ private final MseBlock.Data[] _dataBlocks;
+ @GuardedBy("_lock")
+ private int _takeIndex;
+ @GuardedBy("_lock")
+ private int _putIndex;
+ @GuardedBy("_lock")
+ private int _count;
+ /// Threads waiting to add more data to the queue.
+ ///
+ /// This is used to prevent the following situation:
+ /// 1. The queue is full.
+ /// 2. Thread A tries to add data. Thread A will be blocked waiting for
space in the queue.
+ /// 3. Thread B adds an EOS block, which will transition the queue to
[State#UPSTREAM_FINISHED].
+ /// 4. Thread C reads data from the queue in a loop, the scheduler doesn't
give time to Thread A.
+ /// 5. Thread C consumes all data from the queue and then reads the EOS
block.
+ /// 6. Finally Thread A is unblocked and adds data to the queue, even
though the queue is already closed for write
+ ///
+ /// As a result the block from A will be lost. Instead, we use this
counter to return null in [#poll] when the
+ /// queue is empty but there are still threads trying to add data to the
queue.
+ @GuardedBy("_lock")
+ private int _pendingData;
+ private final ReentrantLock _lock = new ReentrantLock();
+ private final Condition _notFull = _lock.newCondition();
+
+ public CancellableBlockingQueue(String id, int capacity) {
+ _id = id;
+ _dataBlocks = new MseBlock.Data[capacity];
+ }
+
+ /// Notifies the downstream that there is data to read.
+ private void notifyReader() {
+ Reader reader = _reader;
+ if (reader != null) {
+ LOGGER.debug("Notifying reader");
+ reader.blockReadyToRead();
+ } else {
+ LOGGER.debug("No reader to notify");
+ }
+ }
+
+ /// Offers a successful or erroneous EOS block into the queue, returning
the status of the operation.
+ ///
+ /// This method never blocks for long, as it doesn't need to wait for
space in the queue.
+ public ReceivingMailboxStatus offerEos(MseBlock.Eos block,
List<DataBuffer> stats) {
+ ReentrantLock lock = _lock;
+ lock.lock();
+ try {
+ switch (_state) {
+ case FULL_CLOSED:
+ case UPSTREAM_FINISHED:
+ // The queue is closed for write. Always reject the block.
+ LOGGER.debug("Mailbox: {} is already closed for write, ignoring
the late {} block", _id, block);
+ return ReceivingMailboxStatus.ALREADY_TERMINATED;
+ case WAITING_EOS:
+ // We got the EOS block we expected. Close the queue for both read
and write.
+ changeState(State.FULL_CLOSED, "received EOS block");
+ _eos = new MseBlockWithStats(block, stats);
+ notifyReader();
+ return ReceivingMailboxStatus.LAST_BLOCK;
+ case FULL_OPEN:
+ changeState(State.UPSTREAM_FINISHED, "received EOS block");
+ _eos = new MseBlockWithStats(block, stats);
+ notifyReader();
+ if (block.isError()) {
+ drainDataBlocks();
+ }
+ return ReceivingMailboxStatus.LAST_BLOCK;
+ default:
+ throw new IllegalStateException("Unexpected state: " + _state);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /// Offers a data block into the queue within the timeout specified,
returning the status of the operation.
+ public ReceivingMailboxStatus offerData(MseBlock.Data block, long timeout,
TimeUnit timeUnit)
+ throws InterruptedException, TimeoutException {
+ ReentrantLock lock = _lock;
+ lock.lockInterruptibly();
+ try {
+ while (true) {
+ switch (_state) {
+ case FULL_CLOSED:
+ case UPSTREAM_FINISHED:
+ // The queue is closed for write. Always reject the block.
+ LOGGER.debug("Mailbox: {} is already closed for write, ignoring
the late data block", _id);
+ return ReceivingMailboxStatus.ALREADY_TERMINATED;
+ case WAITING_EOS:
+ // The downstream is not interested in reading more data.
+ LOGGER.debug("Mailbox: {} is not interesting in late data
block", _id);
+ return ReceivingMailboxStatus.WAITING_EOS;
+ case FULL_OPEN:
+ if (offerDataToBuffer(block, timeout, timeUnit)) {
+ notifyReader();
+ return ReceivingMailboxStatus.SUCCESS;
+ }
+ // otherwise transitioned to FULL_CLOSED or WAITING_EOS while
waiting for space in the queue
+ // and we need to re-evaluate the state
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state: " + _state);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /// Offers a data block into the queue within the timeout specified,
returning true if the block was added
+ /// successfully.
+ ///
+ /// This method can only be called while the queue is in the FULL_OPEN
state and the lock is held.
+ ///
+ /// This method can time out, in which case we automatically transition to
the [State#FULL_CLOSED] state.
+ /// But instead of returning false, we throw a [TimeoutException]. This is
because the caller may want to
+ /// distinguish between a timeout and other reasons for not being able to
add the block to the queue in order to
+ /// report different error messages.
+ ///
+ /// @return true if the block was added successfully, false if the state
changed while waiting.
+ /// @throws InterruptedException if the thread is interrupted while
waiting for space in the queue.
+ /// @throws TimeoutException if the timeout specified elapsed before space
was available in the queue.
+ @GuardedBy("_lock")
+ private boolean offerDataToBuffer(MseBlock.Data block, long timeout,
TimeUnit timeUnit)
+ throws InterruptedException, TimeoutException {
+
+ assert _state == State.FULL_OPEN;
+
+ long nanos = timeUnit.toNanos(timeout);
+ MseBlock.Data[] items = _dataBlocks;
+ _pendingData++;
+ try {
+ while (_count == items.length && nanos > 0L) {
+ nanos = _notFull.awaitNanos(nanos);
+
+ switch (_state) {
+ case FULL_OPEN: // we are in the same state, continue waiting for
space
+ break;
+ case FULL_CLOSED:
+ case WAITING_EOS:
+ // The queue is closed and the reader is not interested in
reading more data.
+ return false;
+ case UPSTREAM_FINISHED:
+ // Another thread offered the EOS while we were waiting for
space.
+ assert _eos != null;
+ if (_eos._block.isSuccess()) { // If closed with EOS, the reader
is still interested in reading our block
+ continue;
+ }
+ // if closed with an error, the reader is not interested in
reading our block
+ return false;
+ default:
+ throw new IllegalStateException("Unexpected state: " + _state);
+ }
+ }
+ if (nanos <= 0L) { // timed out
+ String errorMessage = "Timed out while waiting for receive operator
to consume data from mailbox: " + _id;
+ ErrorMseBlock timeoutBlock =
ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, errorMessage);
+ changeState(State.FULL_CLOSED, "timed out while waiting to offer
data block");
+ drainDataBlocks();
+ _eos = new MseBlockWithStats(timeoutBlock, List.of());
+ notifyReader();
+ throw new TimeoutException(errorMessage);
+ }
+ items[_putIndex] = block;
+ if (++_putIndex == items.length) {
+ _putIndex = 0;
+ }
+ _count++;
+ return true;
+ } finally {
+ _pendingData--;
+ }
+ }
+
+ /// Returns the first block from the queue, or `null` if there is no block
in the queue. The returned block will be
+ /// an error block if the queue has been cancelled or has encountered an
error.
+ ///
+ /// This method may block briefly while acquiring the lock, but it doesn't
actually require waiting for data in the
+ /// queue.
+ @Nullable
+ public MseBlockWithStats poll() {
+ Preconditions.checkState(_reader != null, "A reader must be registered");
+ ReentrantLock lock = _lock;
+ lock.lock();
+ try {
+ switch (_state) {
+ case FULL_CLOSED:
+ // The queue is closed for both read and write. Always return the
error block.
+ assert _eos != null;
+ return _eos;
+ case WAITING_EOS:
+ // The downstream is not interested in reading more data but is
waiting for an EOS block to get the stats.
+ // Polls returns null and only EOS blocks are accepted by offer.
+ assert _eos == null;
+ return null;
+ case UPSTREAM_FINISHED:
+ // The upstream has indicated that no more data will be sent. Poll
returns pending blocks and then the EOS
+ // block.
+ if (_count == 0) {
+ if (_pendingData > 0) {
+ // There are still threads trying to add data to the queue. We
should wait for them to finish.
+ LOGGER.debug("Mailbox: {} has pending {} data blocks, waiting
for them to finish", _id, _pendingData);
+ return null;
+ } else {
+ changeState(State.FULL_CLOSED, "read all data blocks");
+ return _eos;
+ }
+ }
+ break;
+ case FULL_OPEN:
+ if (_count == 0) {
+ assert _eos == null;
+ return null;
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state: " + _state);
+ }
+ assert _count > 0 : "if we reach here, there must be data in the
queue";
+ MseBlock.Data[] items = _dataBlocks;
+ MseBlock.Data block = items[_takeIndex];
+ assert block != null : "data block in the queue must not be null";
+ items[_takeIndex] = null;
+ if (++_takeIndex == items.length) {
+ _takeIndex = 0;
+ }
+ _count--;
+ _notFull.signal();
+
+ return new MseBlockWithStats(block, List.of());
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @GuardedBy("_lock")
+ private void changeState(State newState, String desc) {
+ LOGGER.debug("Mailbox: {} {}, transitioning from {} to {}", _id, desc,
_state, newState);
+ _state = newState;
+ }
+
+ @GuardedBy("_lock")
+ private void drainDataBlocks() {
+ Arrays.fill(_dataBlocks, null);
+ _notFull.signalAll();
+ _count = 0;
+ }
+
+ public int exactSize() {
+ ReentrantLock lock = _lock;
+ lock.lock();
+ try {
+ return _count;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /// Called by the downstream to indicate that no more data blocks will be
read.
+ public void earlyTerminate() {
+ ReentrantLock lock = _lock;
+ lock.lock();
+ try {
+ switch (_state) {
+ case FULL_CLOSED:
+ case WAITING_EOS:
+ LOGGER.debug("Mailbox: {} is already closed for read", _id);
+ return;
+ case UPSTREAM_FINISHED:
+ drainDataBlocks();
+ changeState(State.FULL_CLOSED, "early terminated");
+ break;
+ case FULL_OPEN:
+ drainDataBlocks();
+ changeState(State.WAITING_EOS, "early terminated");
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state: " + _state);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void registerReader(Reader reader) {
+ if (_reader != null) {
+ throw new IllegalArgumentException("Only one reader is supported");
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("==[MAILBOX]== Reader registered for mailbox: " + _id);
+ }
+ _reader = reader;
+ }
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index d11329f112b..65a0d69831c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -18,9 +18,7 @@
*/
package org.apache.pinot.query.mailbox;
-import java.io.IOException;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.segment.spi.memory.DataBuffer;
@@ -54,16 +52,14 @@ public interface SendingMailbox {
* and they should <b>not</b> acquire any resources when they are created.
This method should throw if there was an
* error sending the data, since that would allow {@link BlockExchange} to
exit early.
*/
- void send(MseBlock.Data data)
- throws IOException, TimeoutException;
+ void send(MseBlock.Data data);
/**
* Sends an EOS block to the receiver. Note that SendingMailbox are required
to acquire resources lazily in this call,
* and they should <b>not</b> acquire any resources when they are created.
This method should throw if there was an
* error sending the data, since that would allow {@link BlockExchange} to
exit early.
*/
- void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
- throws IOException, TimeoutException;
+ void send(MseBlock.Eos block, List<DataBuffer> serializedStats);
/**
* Cancels the mailbox and notifies the receiver of the cancellation so that
it can release the underlying resources.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index cae518fb194..d8a2f04d0c0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.common.proto.Mailbox.MailboxStatus;
@@ -49,6 +50,7 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
private final MailboxService _mailboxService;
private final StreamObserver<MailboxStatus> _responseObserver;
private final List<ByteBuffer> _mailboxBuffers =
Collections.synchronizedList(new ArrayList<>());
+ private boolean _closedStream = false;
private volatile ReceivingMailbox _mailbox;
@@ -61,6 +63,10 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
@Override
public void onNext(MailboxContent mailboxContent) {
+ if (_closedStream) {
+ LOGGER.debug("Received a late message once the stream was closed.
Ignoring it.");
+ return;
+ }
String mailboxId = mailboxContent.getMailboxId();
if (_mailbox == null) {
_mailbox = _mailboxService.getReceivingMailbox(mailboxId);
@@ -71,50 +77,60 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
}
try {
long timeoutMs =
Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
- ReceivingMailbox.ReceivingMailboxStatus status =
_mailbox.offerRaw(_mailboxBuffers, timeoutMs);
- _mailboxBuffers.clear();
+ ReceivingMailbox.ReceivingMailboxStatus status = null;
+ try {
+ status = _mailbox.offerRaw(_mailboxBuffers, timeoutMs);
+ } catch (TimeoutException e) {
+ LOGGER.debug("Timed out adding block into mailbox: {} with timeout:
{}ms", mailboxId, timeoutMs);
+ closeStream();
+ return;
+ } catch (InterruptedException e) {
+ // We are not restoring the interrupt status because we are already
throwing an exception
+ // Code that catches this exception must finish the work fast enough
to comply the interrupt contract
+ // See
https://github.com/apache/pinot/pull/16903#discussion_r2409003423
+ LOGGER.debug("Interrupted while processing blocks for mailbox: {}",
mailboxId, e);
+ closeStream();
+ return;
+ }
switch (status) {
case SUCCESS:
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
.putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
Integer.toString(_mailbox.getNumPendingBlocks())).build());
break;
- case CANCELLED:
- LOGGER.warn("Mailbox: {} already cancelled from upstream",
mailboxId);
- cancelStream();
- break;
- case FIRST_ERROR:
- return;
- case ERROR:
- LOGGER.warn("Mailbox: {} already errored out (received error block
before)", mailboxId);
- cancelStream();
- break;
- case TIMEOUT:
- LOGGER.warn("Timed out adding block into mailbox: {} with timeout:
{}ms", mailboxId, timeoutMs);
- cancelStream();
- break;
- case EARLY_TERMINATED:
- LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
+ case WAITING_EOS:
+ // The receiving mailbox is early terminated, inform the sender to
stop sending more data. Only EOS block is
+ // expected to be sent afterward.
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
.putMetadata(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE,
"true").build());
break;
+ case LAST_BLOCK:
+ LOGGER.debug("Mailbox: {} has received the last block, closing the
stream", mailboxId);
+ closeStream();
+ break;
+ case ALREADY_TERMINATED:
+ LOGGER.error("Trying to offer blocks to the already closed mailbox
{}. This should not happen", mailboxId);
+ closeStream();
+ break;
default:
throw new IllegalStateException("Unsupported mailbox status: " +
status);
}
} catch (Exception e) {
- _mailboxBuffers.clear();
String errorMessage = "Caught exception while processing blocks for
mailbox: " + mailboxId;
LOGGER.error(errorMessage, e);
+ closeStream();
_mailbox.setErrorBlock(
ErrorMseBlock.fromException(new RuntimeException(errorMessage, e)),
Collections.emptyList());
- cancelStream();
+ } finally {
+ _mailboxBuffers.clear();
}
}
- private void cancelStream() {
+ private void closeStream() {
try {
// NOTE: DO NOT use onError() because it will terminate the stream, and
sender might not get the callback
_responseObserver.onCompleted();
+ _closedStream = true;
} catch (Exception e) {
// Exception can be thrown if the stream is already closed, so we simply
ignore it
LOGGER.debug("Caught exception cancelling mailbox: {}", _mailbox != null
? _mailbox.getId() : "unknown", e);
@@ -132,11 +148,19 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
} else {
LOGGER.error("Got error before mailbox is set up", t);
}
+ if (!_closedStream) {
+ _closedStream = true;
+ _responseObserver.onError(t);
+ }
}
@Override
public void onCompleted() {
_mailboxBuffers.clear();
+ if (_closedStream) {
+ return;
+ }
+ _closedStream = true;
try {
_responseObserver.onCompleted();
} catch (Exception e) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 875e46bc6e8..72da1a20aba 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
@@ -74,6 +73,8 @@ import
org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.apache.pinot.spi.executor.HardLimitExecutor;
import org.apache.pinot.spi.executor.MetricsExecutor;
@@ -327,21 +328,32 @@ public class QueryRunner {
}
long deadlineMs = executionContext.getPassiveDeadlineMs();
for (RoutingInfo routingInfo : routingInfos) {
+ String mailboxId = routingInfo.getMailboxId();
try {
StatMap<MailboxSendOperator.StatKey> statMap = new
StatMap<>(MailboxSendOperator.StatKey.class);
SendingMailbox sendingMailbox =
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
routingInfo.getPort(),
- routingInfo.getMailboxId(), deadlineMs, statMap);
+ mailboxId, deadlineMs, statMap);
// TODO: Here we are breaking the stats invariants, sending errors
without including the stats of the
// current stage. We will need to fix this in future, but for now, we
are sending the error block without
// the stats.
sendingMailbox.send(errorBlock, Collections.emptyList());
- } catch (TimeoutException e) {
- LOGGER.warn("Timed out sending error block to mailbox: {} for request:
{}, stage: {}",
- routingInfo.getMailboxId(), requestId, stageId, e);
+ } catch (QueryException e) {
+ QueryErrorCode errorCode = e.getErrorCode();
+ switch (errorCode) {
+ case EXECUTION_TIMEOUT:
+ LOGGER.warn("Timed out sending error block to mailbox: {}",
mailboxId, e);
+ break;
+ case QUERY_CANCELLATION:
+ LOGGER.info("Query cancelled while offering blocks to mailbox:
{}", mailboxId);
+ break;
+ default:
+ LOGGER.error("{} exception while exception sending error block to
mailbox: {}", errorCode, mailboxId, e);
+ break;
+ }
} catch (Exception e) {
LOGGER.error("Caught exception sending error block to mailbox: {} for
request: {}, stage: {}",
- routingInfo.getMailboxId(), requestId, stageId, e);
+ mailboxId, requestId, stageId, e);
}
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 753ec9744b6..250395042a0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -77,12 +77,12 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
asyncStreams.add(asyncStream);
_receivingStats.add(asyncStream._mailbox.getStatMap());
}
- _multiConsumer = new BlockingMultiStreamConsumer.OfMseBlock(context,
asyncStreams);
+ _multiConsumer = new BlockingMultiStreamConsumer.OfMseBlock(context,
asyncStreams, senderStageId);
} else {
// TODO: Revisit if we should throw exception here.
_mailboxIds = List.of();
_receivingStats = List.of();
- _multiConsumer = new BlockingMultiStreamConsumer.OfMseBlock(context,
List.of());
+ _multiConsumer = new BlockingMultiStreamConsumer.OfMseBlock(context,
List.of(), senderStageId);
}
_statMap.merge(StatKey.FAN_IN, _mailboxIds.size());
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 0de27d40f7e..fe24b253f55 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
@@ -45,6 +44,7 @@ import
org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.segment.spi.memory.DataBuffer;
import org.apache.pinot.spi.exception.QueryCancelledException;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.exception.TerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -213,9 +213,7 @@ public class MailboxSendOperator extends MultiStageOperator
{
if (block.isEos()) {
sendEos((MseBlock.Eos) block);
} else {
- if (sendMseBlock(((MseBlock.Data) block))) {
- earlyTerminate();
- }
+ sendMseBlock(((MseBlock.Data) block));
}
checkTerminationAndSampleUsage();
return block;
@@ -225,15 +223,14 @@ public class MailboxSendOperator extends
MultiStageOperator {
} catch (TerminationException e) {
LOGGER.info("Query was terminated for opChain: {}", _context.getId(), e);
return ErrorMseBlock.fromException(e);
- } catch (TimeoutException e) {
- LOGGER.warn("Timed out transferring data on opChain: {}",
_context.getId(), e);
+ } catch (QueryException e) {
return ErrorMseBlock.fromException(e);
- } catch (Exception e) {
+ } catch (RuntimeException e) {
ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
try {
LOGGER.error("Exception while transferring data on opChain: {}",
_context.getId(), e);
sendEos(errorBlock);
- } catch (Exception e2) {
+ } catch (RuntimeException e2) {
LOGGER.error("Exception while sending error block.", e2);
}
return errorBlock;
@@ -246,9 +243,7 @@ public class MailboxSendOperator extends MultiStageOperator
{
return _context.getPassiveDeadlineMs();
}
- private void sendEos(MseBlock.Eos eosBlockWithoutStats)
- throws Exception {
-
+ private void sendEos(MseBlock.Eos eosBlockWithoutStats) {
MultiStageQueryStats stats = null;
List<DataBuffer> serializedStats;
if (_context.isSendStats()) {
@@ -276,22 +271,20 @@ public class MailboxSendOperator extends
MultiStageOperator {
return new StatMap<>(_statMap);
}
- private boolean sendMseBlock(MseBlock.Data block)
- throws Exception {
- boolean isEarlyTerminated = _exchange.send(block);
+ private void sendMseBlock(MseBlock.Data block) {
+ if (_exchange.send(block)) {
+ earlyTerminate();
+ }
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[SEND]== Block " + block + " sent from: " +
_context.getId());
}
- return isEarlyTerminated;
}
- private boolean sendMseBlock(MseBlock.Eos block, List<DataBuffer>
serializedStats)
- throws Exception {
- boolean isEarlyTerminated = _exchange.send(block, serializedStats);
+ private void sendMseBlock(MseBlock.Eos block, List<DataBuffer>
serializedStats) {
+ _exchange.send(block, serializedStats);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[SEND]== Block " + block + " sent from: " +
_context.getId());
}
- return isEarlyTerminated;
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 7b0290a69ec..ced7906c564 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.plan.ExplainInfo;
import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.apache.pinot.query.runtime.operator.set.SetOperator;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -132,6 +133,14 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
protected abstract MseBlock getNextBlock()
throws Exception;
+ /**
+ * Signals the operator to terminate early.
+ *
+ * After this method is called, the operator should stop processing any more
input and return a
+ * {@link SuccessMseBlock} block as soon as possible.
+ * This method should be called when the consumer of the operator does not
need any more data and wants to stop the
+ * execution early to save resources.
+ */
protected void earlyTerminate() {
_isEarlyTerminated = true;
for (MultiStageOperator child : getChildOperators()) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index 261c032e965..c99c469a90a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -18,13 +18,9 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
@@ -100,11 +96,9 @@ public abstract class BlockExchange {
* API to send a block to the destination mailboxes.
* @param block the block to be transferred
* @return true if all the mailboxes has been early terminated.
- * @throws IOException when sending stream unexpectedly closed.
- * @throws TimeoutException when sending stream timeout.
+ * @throws org.apache.pinot.spi.exception.QueryException if any mailbox
fails to send the block, including on timeout.
*/
- public boolean send(MseBlock.Data block)
- throws IOException, TimeoutException {
+ public boolean send(MseBlock.Data block) {
boolean isEarlyTerminated = true;
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
if (!sendingMailbox.isEarlyTerminated()) {
@@ -122,11 +116,9 @@ public abstract class BlockExchange {
* API to send a block to the destination mailboxes.
* @param eosBlock the block to be transferred
* @return true if all the mailboxes has been early terminated.
- * @throws IOException when sending stream unexpectedly closed.
- * @throws TimeoutException when sending stream timeout.
+ * @throws org.apache.pinot.spi.exception.QueryException if any mailbox
fails to send the block, including on timeout.
*/
- public boolean send(MseBlock.Eos eosBlock, List<DataBuffer> serializedStats)
- throws IOException, TimeoutException {
+ public boolean send(MseBlock.Eos eosBlock, List<DataBuffer> serializedStats)
{
int mailboxIdToSendMetadata;
if (!serializedStats.isEmpty()) {
mailboxIdToSendMetadata = _statsIndexChooser.apply(_sendingMailboxes);
@@ -138,18 +130,18 @@ public abstract class BlockExchange {
// this may happen when the block exchange is itself used as a sending
mailbox, like when using spools
mailboxIdToSendMetadata = -1;
}
- Exception firstException = null;
+ RuntimeException firstException = null;
int numMailboxes = _sendingMailboxes.size();
for (int i = 0; i < numMailboxes; i++) {
try {
SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
- List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
+ List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : List.of();
sendingMailbox.send(eosBlock, statsToSend);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
}
- } catch (IOException | TimeoutException | RuntimeException e) {
+ } catch (RuntimeException e) {
// We want to try to send EOS to all mailboxes, so we catch the
exception and rethrow it at the end.
if (firstException == null) {
firstException = e;
@@ -159,22 +151,12 @@ public abstract class BlockExchange {
}
}
if (firstException != null) {
- // This is ugly, but necessary to be sure we throw the right exception,
which is later caught by the
- // QueryRunner and handled properly.
- if (firstException instanceof IOException) {
- throw (IOException) firstException;
- } else if (firstException instanceof TimeoutException) {
- throw (TimeoutException) firstException;
- } else {
- Preconditions.checkState(firstException instanceof RuntimeException);
- throw (RuntimeException) firstException;
- }
+ throw firstException;
}
return false;
}
- protected void sendBlock(SendingMailbox sendingMailbox, MseBlock.Data block)
- throws IOException, TimeoutException {
+ protected void sendBlock(SendingMailbox sendingMailbox, MseBlock.Data block)
{
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Sending block: {} {} to {}", block,
System.identityHashCode(block), sendingMailbox);
}
@@ -192,8 +174,7 @@ public abstract class BlockExchange {
}
}
- protected abstract void route(List<SendingMailbox> destinations,
MseBlock.Data block)
- throws IOException, TimeoutException;
+ protected abstract void route(List<SendingMailbox> destinations,
MseBlock.Data block);
// Called when the OpChain gracefully returns.
// TODO: This is a no-op right now.
@@ -237,8 +218,7 @@ public abstract class BlockExchange {
}
@Override
- public void send(MseBlock.Data data)
- throws IOException, TimeoutException {
+ public void send(MseBlock.Data data) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Exchange mailbox {} echoing data block {} {}", this,
data, System.identityHashCode(data));
}
@@ -246,8 +226,7 @@ public abstract class BlockExchange {
}
@Override
- public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
- throws IOException, TimeoutException {
+ public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Exchange mailbox {} echoing EOS block {} {}", this,
block, System.identityHashCode(block));
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
index 941b4d05486..57f0ba415c1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
@@ -18,9 +18,7 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;
-import java.io.IOException;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
@@ -42,8 +40,7 @@ class BroadcastExchange extends BlockExchange {
}
@Override
- protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
- throws IOException, TimeoutException {
+ protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
{
for (SendingMailbox mailbox : destinations) {
sendBlock(mailbox, block);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index 21ea3729ee2..14a6a6ac940 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -19,10 +19,8 @@
package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.query.mailbox.SendingMailbox;
@@ -54,8 +52,7 @@ class HashExchange extends BlockExchange {
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
- protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
- throws IOException, TimeoutException {
+ protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
{
int numMailboxes = destinations.size();
if (numMailboxes == 1 || _keySelector == EmptyKeySelector.INSTANCE) {
sendBlock(destinations.get(0), block);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
index 6ad534c8a4c..1632018d60b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
@@ -19,10 +19,8 @@
package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.IntFunction;
import org.apache.pinot.query.mailbox.SendingMailbox;
@@ -56,8 +54,7 @@ class RandomExchange extends BlockExchange {
}
@Override
- protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
- throws IOException, TimeoutException {
+ protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
{
int destinationIdx = _rand.apply(destinations.size());
sendBlock(destinations.get(destinationIdx), block);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
index f6fd85d9545..32d2df68a58 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
@@ -19,9 +19,7 @@
package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.base.Preconditions;
-import java.io.IOException;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
@@ -45,8 +43,7 @@ class SingletonExchange extends BlockExchange {
}
@Override
- protected void route(List<SendingMailbox> sendingMailboxes, MseBlock.Data
block)
- throws IOException, TimeoutException {
+ protected void route(List<SendingMailbox> sendingMailboxes, MseBlock.Data
block) {
sendBlock(sendingMailboxes.get(0), block);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
index e04e6000aeb..ca6e3dcc8ff 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
@@ -344,12 +344,14 @@ public abstract class BlockingMultiStreamConsumer<E>
implements AutoCloseable {
private final int _stageId;
@Nullable
private MultiStageQueryStats _stats;
+ private final int _senderStageId;
public OfMseBlock(OpChainExecutionContext context,
- List<? extends AsyncStream<ReceivingMailbox.MseBlockWithStats>>
asyncProducers) {
+ List<? extends AsyncStream<ReceivingMailbox.MseBlockWithStats>>
asyncProducers, int senderStageId) {
super(context.getId(), context.getPassiveDeadlineMs(), asyncProducers);
_stageId = context.getStageId();
_stats = MultiStageQueryStats.emptyStats(context.getStageId());
+ _senderStageId = senderStageId;
}
@Override
@@ -393,7 +395,7 @@ public abstract class BlockingMultiStreamConsumer<E>
implements AutoCloseable {
return onException(terminateException.getErrorCode(),
terminateException.getMessage());
}
// TODO: Add the sender stage id to the error message
- String errMsg = "Timed out on stage " + _stageId + " waiting for data
sent by a child stage";
+ String errMsg = "Timed out on stage " + _stageId + " waiting for data
from child stage " + _senderStageId;
// We log this case as debug because:
// - The opchain will already log a stackless message once the opchain
fail
// - The trace is not useful (the log message is good enough to find
where we failed)
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index f5f1544e31e..78f21ac0569 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -97,17 +97,21 @@ public class MailboxServiceTest {
receivingMailbox.registeredReader(() -> {
});
for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
- assertEquals(receivingMailbox.getNumPendingBlocks(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i);
+ assertEquals(receivingMailbox.getNumPendingBlocks(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i - 1);
List<Object[]> rows = getRows(receivingMailbox);
assertEquals(rows.size(), 1);
assertEquals(rows.get(0), new Object[]{Integer.toString(i)});
}
- assertEquals(receivingMailbox.getNumPendingBlocks(), 1);
+ assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
ReceivingMailbox.MseBlockWithStats block = receivingMailbox.poll();
assertNotNull(block);
assertTrue(block.getBlock().isSuccess());
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
- assertNull(receivingMailbox.poll());
+
+ // Following calls to poll() should return the same eos block
+ block = receivingMailbox.poll();
+ assertNotNull(block);
+ assertTrue(block.getBlock().isSuccess());
}
@Test
@@ -132,17 +136,16 @@ public class MailboxServiceTest {
assertEquals(numCallbacks.get(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS);
for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
- assertEquals(receivingMailbox.getNumPendingBlocks(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i);
+ assertEquals(receivingMailbox.getNumPendingBlocks(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i - 1);
List<Object[]> rows = getRows(receivingMailbox);
assertEquals(rows.size(), 1);
assertEquals(rows.get(0), new Object[]{Integer.toString(i)});
}
- assertEquals(receivingMailbox.getNumPendingBlocks(), 1);
+ assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
MseBlock block = readBlock(receivingMailbox);
assertNotNull(block);
assertTrue(block.isEos());
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
- assertNull(receivingMailbox.poll());
}
@Test
@@ -213,7 +216,7 @@ public class MailboxServiceTest {
// Send one data block and then cancel
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{"0"}));
receivingMailbox.cancel();
- assertEquals(numCallbacks.get(), 1);
+ assertEquals(numCallbacks.get(), 2);
// Data blocks will be cleaned up
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
@@ -224,7 +227,7 @@ public class MailboxServiceTest {
// Cancel is idempotent for both sending and receiving mailbox, so safe to
call multiple times
sendingMailbox.cancel(new Exception("TEST ERROR"));
receivingMailbox.cancel();
- assertEquals(numCallbacks.get(), 1);
+ assertEquals(numCallbacks.get(), 2);
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
}
@@ -281,7 +284,8 @@ public class MailboxServiceTest {
// Next send will throw exception because buffer is full
try {
- sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
+ // The block sent must be a data block, as error and eos blocks are
always accepted
+ sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{Integer.toString(1)}));
fail("Except exception when sending data after buffer is full");
} catch (Exception e) {
// Expected
@@ -336,7 +340,7 @@ public class MailboxServiceTest {
// Sends are non-blocking as long as channel capacity is not breached
SendingMailbox sendingMailbox =
_mailboxService2.getSendingMailbox("localhost",
_mailboxService1.getPort(), mailboxId, Long.MAX_VALUE, _stats);
- for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
+ for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS; i++) {
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{Integer.toString(i)}));
}
sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
@@ -349,18 +353,16 @@ public class MailboxServiceTest {
aVoid -> receivingMailbox.getNumPendingBlocks() ==
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS, 1000L,
"Failed to deliver mails");
- for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
+ for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS; i++) {
assertEquals(receivingMailbox.getNumPendingBlocks(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i);
List<Object[]> rows = getRows(receivingMailbox);
assertEquals(rows.size(), 1);
assertEquals(rows.get(0), new Object[]{Integer.toString(i)});
}
- assertEquals(receivingMailbox.getNumPendingBlocks(), 1);
+ assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
MseBlock block = readBlock(receivingMailbox);
assertNotNull(block);
assertTrue(block.isSuccess());
- assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
- assertNull(receivingMailbox.poll());
}
@Test
@@ -391,17 +393,16 @@ public class MailboxServiceTest {
assertEquals(numCallbacks.get(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS);
for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
- assertEquals(receivingMailbox.getNumPendingBlocks(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i);
+ assertEquals(receivingMailbox.getNumPendingBlocks(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i - 1);
List<Object[]> rows = getRows(receivingMailbox);
assertEquals(rows.size(), 1);
assertEquals(rows.get(0), new Object[]{Integer.toString(i)});
}
- assertEquals(receivingMailbox.getNumPendingBlocks(), 1);
+ assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
MseBlock block = readBlock(receivingMailbox);
assertNotNull(block);
assertTrue(block.isSuccess());
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
- assertNull(receivingMailbox.poll());
}
@Test
@@ -536,7 +537,7 @@ public class MailboxServiceTest {
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{"0"}));
receiveMailLatch.await();
receivingMailbox.cancel();
- assertEquals(numCallbacks.get(), 1);
+ assertEquals(numCallbacks.get(), 2);
// Data blocks will be cleaned up
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
@@ -547,7 +548,7 @@ public class MailboxServiceTest {
// Cancel is idempotent for both sending and receiving mailbox, so safe to
call multiple times
sendingMailbox.cancel(new Exception("TEST ERROR"));
receivingMailbox.cancel();
- assertEquals(numCallbacks.get(), 1);
+ assertEquals(numCallbacks.get(), 2);
assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
}
@@ -619,7 +620,8 @@ public class MailboxServiceTest {
}
// Next send will be blocked on the receiver side and cause exception
after timeout
- sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
+ // We need to send a data block, given we don't block on EOS
+ sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new
Object[]{payload}));
receiveMailLatch.await();
assertEquals(numCallbacks.get(),
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS + 1);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
new file mode 100644
index 00000000000..b9c35169afd
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class ReceivingMailboxTest {
+
+ private static final DataSchema DATA_SCHEMA =
+ new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+ private static final MseBlock.Data DATA_BLOCK = new
RowHeapDataBlock(List.of(), DATA_SCHEMA, null);
+ private ReceivingMailbox.Reader _reader;
+
+ @BeforeMethod
+ public void setUp() {
+ _reader = Mockito.mock(ReceivingMailbox.Reader.class);
+ }
+
+ @Test
+ public void tooManyDataBlocksTheWriter()
+ throws InterruptedException, TimeoutException {
+ int size = 2;
+ ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", size);
+
+ // Offer up to capacity
+ for (int i = 0; i < size; i++) {
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS,
"Should be able to offer up to capacity");
+ }
+ // Offer one more should be rejected
+ try {
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+ fail("Should have thrown timeout exception, but " + status + " was
returned");
+ } catch (TimeoutException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void offerAfterEos() throws InterruptedException, TimeoutException {
+ ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS,
"Should be able to offer before EOS");
+
+ status = receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK,
"Should be able to offer EOS");
+
+ // Data offer after EOS should be rejected
+ status = receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+ assertEquals(status,
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+ "Should not be able to offer after EOS");
+
+ // Success offer after EOS should be rejected
+ status = receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+ assertEquals(status,
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+ "Should not be able to offer after EOS");
+
+ // Error offer after EOS should be rejected
+ status =
receivingMailbox.offer(ErrorMseBlock.fromError(QueryErrorCode.INTERNAL,
"test"), List.of(), 10);
+ assertEquals(status,
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+ "Should not be able to offer after EOS");
+ }
+
+ @Test
+ public void shouldReadDataInOrder() throws InterruptedException,
TimeoutException {
+ ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+ receivingMailbox.registeredReader(_reader);
+
+ MseBlock[] offeredBlocks = new MseBlock[] {
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+ };
+ for (MseBlock block : offeredBlocks) {
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(block, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS,
"Should be able to offer before EOS");
+ }
+
+ for (MseBlock offered : offeredBlocks) {
+ ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+ assertNotNull(read, "Should be able to read offered blocks");
+ assertEquals(read.getBlock(), offered, "Should read blocks in the order
they were offered");
+ }
+
+ assertNull(receivingMailbox.poll(), "No more blocks to read, should return
null");
+ }
+
+ @Test
+ public void lateEosRead() throws InterruptedException, TimeoutException {
+ ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+ receivingMailbox.registeredReader(_reader);
+
+ MseBlock[] offeredBlocks = new MseBlock[] {
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+ };
+ for (MseBlock block : offeredBlocks) {
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(block, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS,
"Should be able to offer before EOS");
+ }
+
+ for (MseBlock offered : offeredBlocks) {
+ ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+ assertNotNull(read, "Should be able to read offered blocks");
+ assertEquals(read.getBlock(), offered, "Should read blocks in the order
they were offered");
+ }
+
+ // Offer EOS after all data blocks are read
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK,
"Should be able to offer EOS");
+
+ ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+ assertNotNull(read, "Should be able to read EOS");
+ assertEquals(read.getBlock(), SuccessMseBlock.INSTANCE, "Should read EOS
block");
+
+ // Offer after EOS should be rejected
+ status = receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+ assertEquals(status,
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+ "Should not be able to offer after EOS");
+
+ // Poll again should return the EOS
+ ReceivingMailbox.MseBlockWithStats latePoll = receivingMailbox.poll();
+ assertNotNull(latePoll, "Should be able to read EOS");
+ assertEquals(latePoll.getBlock(), SuccessMseBlock.INSTANCE, "Should read
EOS block");
+ }
+
+ @Test
+ public void bufferedDataIsKeptOnSuccess() throws InterruptedException,
TimeoutException {
+ ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+ receivingMailbox.registeredReader(_reader);
+
+ MseBlock[] offeredBlocks = new MseBlock[] {
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+ };
+ for (MseBlock block : offeredBlocks) {
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(block, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS,
"Should be able to offer before EOS");
+ }
+ // Offer EOS
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK,
"Should be able to offer EOS");
+
+ for (MseBlock offered : offeredBlocks) {
+ ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+ assertNotNull(read, "Should be able to read offered blocks");
+ assertEquals(read.getBlock(), offered, "Should read blocks in the order
they were offered");
+ }
+ ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+ assertNotNull(read, "Should be able to read EOS");
+ assertEquals(read.getBlock(), SuccessMseBlock.INSTANCE, "Should read EOS
block");
+ }
+
+ @Test
+ public void bufferedDataIsDiscardedOnError() throws InterruptedException,
TimeoutException {
+ ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+ receivingMailbox.registeredReader(_reader);
+ ErrorMseBlock errorBlock = ErrorMseBlock.fromException(new
RuntimeException("Test error"));
+
+ MseBlock[] offeredBlocks = new MseBlock[] {
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+ };
+ for (MseBlock block : offeredBlocks) {
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(block, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS,
"Should be able to offer before EOS");
+ }
+ // Offer EOS
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(errorBlock, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK,
"Should be able to offer EOS");
+
+ ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+ assertNotNull(read, "Should be able to read EOS");
+ assertEquals(read.getBlock(), errorBlock, "Should read EOS block");
+ }
+
+ @Test
+ public void dataAfterSuccess() throws InterruptedException, TimeoutException
{
+ ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+ receivingMailbox.registeredReader(_reader);
+
+ // Offer EOS
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK,
"Should be able to offer EOS");
+
+ ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+ assertNotNull(read, "Should be able to read EOS");
+ assertEquals(read.getBlock(), SuccessMseBlock.INSTANCE, "Should read EOS
block");
+
+ // Offer after EOS should be rejected
+ status = receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+ assertEquals(status,
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+ "Should not be able to offer after EOS");
+
+ // Poll again should return the EOS
+ ReceivingMailbox.MseBlockWithStats latePoll = receivingMailbox.poll();
+ assertNotNull(latePoll, "Should be able to read EOS");
+ assertEquals(latePoll.getBlock(), SuccessMseBlock.INSTANCE, "Should read
EOS block");
+ }
+
+ @Test
+ public void dataAfterError() throws InterruptedException, TimeoutException {
+ ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+ receivingMailbox.registeredReader(_reader);
+
+ // Offer EOS
+ ErrorMseBlock errorBlock = ErrorMseBlock.fromException(new
RuntimeException("Test error"));
+ ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(errorBlock, List.of(), 10);
+ assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK,
"Should be able to offer EOS");
+
+ ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+ assertNotNull(read, "Should be able to read EOS");
+ assertEquals(read.getBlock(), errorBlock, "Should read EOS block");
+
+ // Offer after EOS should be rejected
+ status = receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+ assertEquals(status,
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+ "Should not be able to offer after EOS");
+
+ // Poll again should return the EOS
+ ReceivingMailbox.MseBlockWithStats latePoll = receivingMailbox.poll();
+ assertNotNull(latePoll, "Should be able to read EOS");
+ assertEquals(latePoll.getBlock(), errorBlock, "Should read EOS block");
+ }
+
+ @Test(timeOut = 10_000)
+ public void earlyTerminateUnblocksOffers()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ int maxPendingBlocks = 2;
+ ReceivingMailbox mailbox = new ReceivingMailbox("id", maxPendingBlocks);
+
+ ExecutorService offerEx = Executors.newCachedThreadPool();
+ try {
+ for (int i = 0; i < maxPendingBlocks; i++) {
+ CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> future =
offer(DATA_BLOCK, mailbox, offerEx);
+ future.join();
+ }
+ CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> blocked =
offer(DATA_BLOCK, mailbox, offerEx);
+ Thread.sleep(100); // a little wait to facilitate the offer to be blocked
+ mailbox.earlyTerminate();
+ ReceivingMailbox.ReceivingMailboxStatus status = blocked.get(10_000,
TimeUnit.MILLISECONDS);
+ assertEquals(status,
ReceivingMailbox.ReceivingMailboxStatus.WAITING_EOS);
+ } finally {
+ offerEx.shutdownNow();
+ }
+ }
+
+ @Test(timeOut = 10_000)
+ public void readingUnblocksWriters()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ int maxPendingBlocks = 2;
+ ReceivingMailbox mailbox = new ReceivingMailbox("id", maxPendingBlocks);
+ mailbox.registeredReader(_reader);
+
+ ExecutorService offerEx = Executors.newSingleThreadExecutor();
+ try {
+ for (int i = 0; i < maxPendingBlocks; i++) {
+ offer(DATA_BLOCK, mailbox, offerEx);
+ }
+ CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> blocked =
offer(DATA_BLOCK, mailbox, offerEx);
+
+ int numRead = 0;
+ do {
+ ReceivingMailbox.MseBlockWithStats poll = mailbox.poll();
+ if (poll == null) {
+ // No more to read
+ Thread.sleep(10);
+ } else {
+ numRead++;
+ assertEquals(poll.getBlock(), DATA_BLOCK, "The read block should
match the sent block");
+ }
+ } while (numRead < maxPendingBlocks + 1);
+ assertEquals(mailbox.getNumPendingBlocks(), 0, "All blocks should have
been read");
+ assertTrue(blocked.isDone(), "The blocked offer should be unblocked by
reading");
+ assertEquals(blocked.get(),
ReceivingMailbox.ReceivingMailboxStatus.SUCCESS,
+ "The unblocked offer should succeed");
+ } finally {
+ offerEx.shutdownNow();
+ }
+ }
+
+ CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> offer(MseBlock
block, ReceivingMailbox receivingMailbox,
+ ExecutorService executor) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return receivingMailbox.offer(block, List.of(), 10_000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }, executor);
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 19b11aa9215..8861357f4c6 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.operator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.mailbox.MailboxService;
@@ -104,26 +103,6 @@ public class MailboxSendOperatorTest {
assertTrue(captor.getValue().isError(), "expected to send error block to
exchange");
}
- @Test
- public void shouldNotSendErrorBlockWhenTimedOut()
- throws Exception {
- // Given:
- MseBlock.Data dataBlock = getDummyDataBlock();
- when(_input.nextBlock()).thenReturn(dataBlock);
- doThrow(new TimeoutException()).when(_exchange).send(any());
-
- // When:
- MseBlock block = getOperator().nextBlock();
-
- // Then:
- assertTrue(block.isError(), "expected error block to propagate");
- ArgumentCaptor<MseBlock.Data> captor =
ArgumentCaptor.forClass(MseBlock.Data.class);
- verify(_exchange).send(captor.capture());
- assertSame(captor.getValue(), dataBlock, "expected to send data block to
exchange");
-
- verify(_exchange, never()).send(any(), anyList());
- }
-
@Test
public void shouldSendEosBlock()
throws Exception {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index 4ea75fcad74..93ffbb97c7a 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -20,9 +20,7 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
-import java.io.IOException;
import java.util.List;
-import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.mailbox.SendingMailbox;
@@ -178,8 +176,7 @@ public class BlockExchangeTest {
}
@Override
- protected void route(List<SendingMailbox> destinations, MseBlock.Data
block)
- throws IOException, TimeoutException {
+ protected void route(List<SendingMailbox> destinations, MseBlock.Data
block) {
for (SendingMailbox mailbox : destinations) {
sendBlock(mailbox, block);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index e1fc22f1bfa..6edba101848 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -60,6 +60,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
@@ -267,11 +268,11 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
// TODO: name the test using testCaseName for testng reports
@Test(dataProvider = "testResourceQueryTestCaseProviderInputOnly")
- public void testQueryTestCasesWithH2(String testCaseName, boolean isIgnored,
String sql, String h2Sql, String expect,
- boolean keepOutputRowOrder, boolean ignoreV2Optimizer)
+ public void testQueryTestCasesWithH2(String testCaseName, boolean isIgnored,
String sql, String h2Sql,
+ @Nullable String expectErrorMsg, boolean keepOutputRowOrder, boolean
ignoreV2Optimizer)
throws Exception {
// query pinot
- runQuery(sql, expect, false).ifPresent(queryResult -> {
+ runQuery(sql, expectErrorMsg, false).ifPresent(queryResult -> {
try {
compareRowEquals(queryResult.getResultTable(), queryH2(h2Sql),
keepOutputRowOrder);
} catch (Exception e) {
@@ -393,22 +394,34 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
});
}
- private Optional<QueryDispatcher.QueryResult> runQuery(String sql, final
String except, boolean trace)
+ private Optional<QueryDispatcher.QueryResult> runQuery(String sql, @Nullable
String expectedErrorMsg, boolean trace)
throws Exception {
try {
// query pinot
QueryDispatcher.QueryResult queryResult = queryRunner(sql, trace);
- Assert.assertNull(except, "Expected error with message '" + except + "'.
But instead rows were returned: "
- + JsonUtils.objectToPrettyString(queryResult.getResultTable()));
+ if (expectedErrorMsg == null) {
+ Assert.assertTrue(queryResult.getProcessingException() == null,
+ "Unexpected exception: " +
JsonUtils.objectToPrettyString(queryResult.getProcessingException()));
+ } else {
+ Assert.assertTrue(queryResult.getProcessingException() != null,
+ "Expected error with message '" + expectedErrorMsg + "'. But
instead no error was thrown.");
+ Pattern pattern = Pattern.compile(expectedErrorMsg, Pattern.DOTALL);
+
Assertions.assertThat(queryResult.getProcessingException().getMessage()).matches(pattern);
+ return Optional.empty();
+ }
+ Assert.assertNull(expectedErrorMsg, "Expected error with message '" +
expectedErrorMsg
+ + "'. But instead rows were returned: " +
JsonUtils.objectToPrettyString(queryResult.getResultTable()));
+ Assert.assertNotNull(queryResult.getResultTable(),
+ "Result table is null: " +
JsonUtils.objectToPrettyString(queryResult));
return Optional.of(queryResult);
} catch (Exception e) {
- if (except == null) {
+ if (expectedErrorMsg == null) {
throw e;
} else {
- Pattern pattern = Pattern.compile(except, Pattern.DOTALL);
+ Pattern pattern = Pattern.compile(expectedErrorMsg, Pattern.DOTALL);
Assert.assertTrue(pattern.matcher(e.getMessage()).matches(),
String.format("Caught exception '%s', but it did not match the
expected pattern '%s'.", e.getMessage(),
- except));
+ expectedErrorMsg));
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
index 14181c31228..3d205129d09 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
@@ -92,6 +92,8 @@ public class ColocatedJoinQuickStart extends
MultistageEngineQuickStart {
overrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
true);
overrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
true);
+
+
overrides.put(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
1024);
return overrides;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]