This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5b289fc2e5 [multistage] Fix Leaks in Mailbox (#10322) 5b289fc2e5 is described below commit 5b289fc2e5acb14388f84cb60b7c5d2d71f5e6cf Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Fri Mar 10 23:31:25 2023 +0530 [multistage] Fix Leaks in Mailbox (#10322) - Mailbox ownership model: * ---------------------------------------------------------------------------- * (Operator Layer) * MailboxSendOperator ---------> MailboxReceiveOperator * | | * | | * ------------------|----------------------------------|---------------------- * | | * (MailboxService) | | ( WAIT ON INIT ) * \_/ \_/ * SendingMailbox ReceivingMailbox * ------------------|---------------------------------/^\--------------------- * (Physical Layer) | | * (e.g. GRPC) | | ( INITIALIZE ) * | | * \_/ | * StreamObserver -------------------> StreamObserver * ---------------------------------------------------------------------------- - Work items done in this PR * clean up MailboxService API to sanitize cancel/release. * completely move SendMailbox management out of MailboxService * make sending mailbox lazy-open GRPC observers * add queue offer timeout to avoid infinite OOM blow up * handle close/cancel on leaf-stage operator + minor log change * enhanced in-mem transfer stream --- .../pinot/query/mailbox/GrpcMailboxService.java | 112 +++++++---- .../pinot/query/mailbox/GrpcReceivingMailbox.java | 39 +++- .../pinot/query/mailbox/GrpcSendingMailbox.java | 73 ++++--- .../query/mailbox/InMemoryMailboxService.java | 65 ++++-- .../query/mailbox/InMemoryReceivingMailbox.java | 34 ++-- .../query/mailbox/InMemorySendingMailbox.java | 49 +++-- .../apache/pinot/query/mailbox/MailboxService.java | 44 +++- .../query/mailbox/MultiplexingMailboxService.java | 15 +- .../pinot/query/mailbox/ReceivingMailbox.java | 50 +++-- .../apache/pinot/query/mailbox/SendingMailbox.java | 42 ++-- .../mailbox/channel/InMemoryTransferStream.java | 101 ++++++++++ .../channel/MailboxContentStreamObserver.java | 162 ++++++--------- .../channel/MailboxStatusStreamObserver.java | 12 +- .../apache/pinot/query/runtime/QueryRunner.java | 65 +++--- .../runtime/executor/OpChainSchedulerService.java | 9 +- .../runtime/operator/MailboxReceiveOperator.java | 16 ++ .../runtime/operator/MailboxSendOperator.java | 27 ++- .../runtime/operator/exchange/BlockExchange.java | 31 ++- .../operator/exchange/BroadcastExchange.java | 3 +- .../runtime/operator/exchange/HashExchange.java | 3 +- .../runtime/operator/exchange/RandomExchange.java | 3 +- .../operator/exchange/SingletonExchange.java | 3 +- .../query/runtime/plan/PhysicalPlanVisitor.java | 3 +- .../query/runtime/plan/PlanRequestContext.java | 9 +- .../runtime/plan/ServerRequestPlanVisitor.java | 4 +- .../plan/server/ServerPlanRequestContext.java | 6 +- .../query/mailbox/GrpcMailboxServiceTest.java | 224 +++++++++++++++++++-- .../query/mailbox/InMemoryMailboxServiceTest.java | 121 ++++++++++- .../mailbox/MultiplexingMailboxServiceTest.java | 8 +- .../executor/OpChainSchedulerServiceTest.java | 4 +- .../runtime/operator/MailboxSendOperatorTest.java | 33 ++- .../operator/exchange/BlockExchangeTest.java | 12 +- .../operator/exchange/BroadcastExchangeTest.java | 3 +- .../operator/exchange/HashExchangeTest.java | 3 +- .../operator/exchange/RandomExchangeTest.java | 3 +- .../operator/exchange/SingletonExchangeTest.java | 3 +- 36 files changed, 1020 insertions(+), 374 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java index 37fd81b3d2..744c681752 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java @@ -18,45 +18,61 @@ */ package org.apache.pinot.query.mailbox; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import io.grpc.ManagedChannel; -import io.grpc.stub.StreamObserver; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.proto.PinotMailboxGrpc; import org.apache.pinot.query.mailbox.channel.ChannelManager; import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * GRPC-based implementation of {@link MailboxService}. + * GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox + * and/or the underlying connection can be leaked: * - * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the - * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code> + * <ol> + * <li>When the OpChain corresponding to the receiver was never registered.</li> + * <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li> + * </ol> * - * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well. - * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period - * after the last successful message sent/received. - * - * <p>Noted that: - * <ul> - * <li>the latter part of the mailboxID consist of the channelID.</li> - * <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires - * to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li> - * </ul> + * To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a + * leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also + * released. */ public class GrpcMailboxService implements MailboxService<TransferableBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class); // channel manager + private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5); private final ChannelManager _channelManager; private final String _hostname; private final int _mailboxPort; - // maintaining a list of registered mailboxes. - private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap = - new ConcurrentHashMap<>(); + // We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases + // where the corresponding OpChain is either never registered or died before the sender sent data for the first time. + private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache = + CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES) + .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() { + @Override + public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) { + if (notification.wasEvicted()) { + // TODO: This should be tied with query deadline, but for that we need to know the query deadline + // when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver. + LOGGER.warn("Removing dangling GrpcReceivingMailbox: {}", notification.getKey()); + notification.getValue().cancel(); + } + } + }) + .build(); private final Consumer<MailboxIdentifier> _gotMailCallback; public GrpcMailboxService(String hostname, int mailboxPort, PinotConfiguration extraConfig, @@ -88,29 +104,57 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> { } /** - * Register a mailbox, mailbox needs to be registered before use. - * @param mailboxId the id of the mailbox. + * {@inheritDoc} */ - public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) { - ManagedChannel channel = getChannel(mailboxId.toString()); - PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel); - CountDownLatch latch = new CountDownLatch(1); - StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver = - stub.open(new MailboxStatusStreamObserver(latch)); - GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), mailboxContentStreamObserver, latch); + @Override + public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs) { + MailboxStatusStreamObserver statusStreamObserver = new MailboxStatusStreamObserver(); + + GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), statusStreamObserver, (deadline) -> { + ManagedChannel channel = getChannel(mailboxId.toString()); + PinotMailboxGrpc.PinotMailboxStub stub = + PinotMailboxGrpc.newStub(channel) + .withDeadlineAfter(Math.max(0L, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS); + return stub.open(statusStreamObserver); + }, deadlineMs); return mailbox; } /** - * Register a mailbox, mailbox needs to be registered before use. - * @param mailboxId the id of the mailbox. + * {@inheritDoc} See {@link GrpcMailboxService} for details on the design. */ + @Override public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) { - return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(), - (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback)); + try { + return _receivingMailboxCache.get(mailboxId.toString(), + () -> new GrpcReceivingMailbox(mailboxId.toString(), _gotMailCallback)); + } catch (ExecutionException e) { + LOGGER.error(String.format("Error getting receiving mailbox: %s", mailboxId), e); + throw new RuntimeException(e); + } + } + + /** + * If there's a cached receiving mailbox and it isn't closed (i.e. query didn't finish successfully), then this + * calls a cancel to ensure that the underlying gRPC stream is closed. After that the receiving mailbox is removed + * from the cache. + * <p> + * Also refer to the definition in the interface: + * </p> + * <p> + * {@inheritDoc} + * </p> + */ + @Override + public void releaseReceivingMailbox(MailboxIdentifier mailboxId) { + GrpcReceivingMailbox receivingMailbox = _receivingMailboxCache.getIfPresent(mailboxId.toString()); + if (receivingMailbox != null && !receivingMailbox.isClosed()) { + receivingMailbox.cancel(); + } + _receivingMailboxCache.invalidate(mailboxId.toString()); } - public ManagedChannel getChannel(String mailboxId) { + private ManagedChannel getChannel(String mailboxId) { return _channelManager.getChannel(Utils.constructChannelId(mailboxId)); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java index 464c092e87..f6dde0c823 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java @@ -18,34 +18,41 @@ */ package org.apache.pinot.query.mailbox; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import javax.annotation.Nullable; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.datablock.MetadataBlock; +import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.proto.Mailbox.MailboxContent; import org.apache.pinot.query.mailbox.channel.ChannelUtils; import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * GRPC implementation of the {@link ReceivingMailbox}. + * GRPC implementation of the {@link ReceivingMailbox}. This mailbox doesn't hold any resources upon creation. + * Instead an explicit {@link #init} call is made when the sender sends the first data-block which attaches + * references to the {@link StreamObserver} to this mailbox. */ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcReceivingMailbox.class); private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L; private final String _mailboxId; - private Consumer<MailboxIdentifier> _gotMailCallback; + private final Consumer<MailboxIdentifier> _gotMailCallback; private final CountDownLatch _initializationLatch; - private final AtomicInteger _totalMsgReceived = new AtomicInteger(0); private MailboxContentStreamObserver _contentStreamObserver; + private StreamObserver<Mailbox.MailboxStatus> _statusStreamObserver; public GrpcReceivingMailbox(String mailboxId, Consumer<MailboxIdentifier> gotMailCallback) { _mailboxId = mailboxId; @@ -53,9 +60,11 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> _initializationLatch = new CountDownLatch(1); } - public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObserver) { + public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObserver, + StreamObserver<Mailbox.MailboxStatus> statusStreamObserver) { if (_initializationLatch.getCount() > 0) { _contentStreamObserver = streamObserver; + _statusStreamObserver = statusStreamObserver; _initializationLatch.countDown(); } return _gotMailCallback; @@ -70,29 +79,37 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> * 2. If the received block from the sender is a data-block with 0 rows. * </p> */ + @Nullable @Override - public TransferableBlock receive() - throws Exception { + public TransferableBlock receive() throws Exception { if (!waitForInitialize()) { return null; } MailboxContent mailboxContent = _contentStreamObserver.poll(); - _totalMsgReceived.incrementAndGet(); return mailboxContent == null ? null : fromMailboxContent(mailboxContent); } @Override public boolean isInitialized() { - return _initializationLatch.getCount() <= 0; + return _initializationLatch.getCount() == 0; } @Override public boolean isClosed() { - return isInitialized() && _contentStreamObserver.isCompleted(); + return isInitialized() && _contentStreamObserver.hasConsumedAllData(); } @Override - public void cancel(Throwable e) { + public void cancel() { + if (isInitialized()) { + try { + _statusStreamObserver.onError(Status.CANCELLED.asRuntimeException()); + } catch (Exception e) { + // TODO: This can happen if the call is already closed. Consider removing this log altogether or find a way + // to check if the stream is already closed. + LOGGER.info("Tried to cancel receiving mailbox", e); + } + } } private boolean waitForInitialize() diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java index d2f4de89c2..3fa6d6c229 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java @@ -18,79 +18,94 @@ */ package org.apache.pinot.query.mailbox; +import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.MetadataBlock; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.proto.Mailbox.MailboxContent; import org.apache.pinot.query.mailbox.channel.ChannelUtils; +import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * GRPC implementation of the {@link SendingMailbox}. + * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is created on the first call to {@link #send}. */ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class); private final String _mailboxId; private final AtomicBoolean _initialized = new AtomicBoolean(false); - private final AtomicInteger _totalMsgSent = new AtomicInteger(0); - private final CountDownLatch _finishLatch; - private final StreamObserver<MailboxContent> _mailboxContentStreamObserver; + private StreamObserver<MailboxContent> _mailboxContentStreamObserver; + private final Function<Long, StreamObserver<MailboxContent>> _mailboxContentStreamObserverSupplier; + private final MailboxStatusStreamObserver _statusObserver; + private final long _deadlineMs; - public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> mailboxContentStreamObserver, - CountDownLatch latch) { + public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver statusObserver, + Function<Long, StreamObserver<MailboxContent>> contentStreamObserverSupplier, long deadlineMs) { _mailboxId = mailboxId; - _mailboxContentStreamObserver = mailboxContentStreamObserver; - _finishLatch = latch; - _initialized.set(false); + _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier; + _statusObserver = statusObserver; + _deadlineMs = deadlineMs; } @Override public void send(TransferableBlock block) - throws UnsupportedOperationException { + throws Exception { if (!_initialized.get()) { - // initialization is special open(); } + Preconditions.checkState(!_statusObserver.isFinished(), + "Called send when stream is already closed for mailbox=" + _mailboxId); MailboxContent data = toMailboxContent(block.getDataBlock()); _mailboxContentStreamObserver.onNext(data); - _totalMsgSent.incrementAndGet(); } @Override - public void complete() { + public void complete() + throws Exception { _mailboxContentStreamObserver.onCompleted(); } @Override - public void open() { - // TODO: Get rid of init call. - // send a begin-of-stream message. - _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId) - .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build()); - _initialized.set(true); + public boolean isInitialized() { + return _initialized.get(); } @Override - public String getMailboxId() { - return _mailboxId; + public void cancel(Throwable t) { + if (_initialized.get() && !_statusObserver.isFinished()) { + LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId); + try { + _mailboxContentStreamObserver.onError(Status.fromThrowable( + new RuntimeException("Cancelled by the sender")).asRuntimeException()); + } catch (Exception e) { + // TODO: We don't necessarily need to log this since this is relatively quite likely to happen. Logging this + // anyways as info for now so we can see how frequently this happens. + LOGGER.info("Unexpected error issuing onError to MailboxContentStreamObserver: {}", e.getMessage()); + } + } } @Override - public void waitForFinish(long timeout, TimeUnit unit) - throws InterruptedException { - _finishLatch.await(timeout, unit); + public String getMailboxId() { + return _mailboxId; } - @Override - public void cancel(Throwable t) { + private void open() { + _mailboxContentStreamObserver = _mailboxContentStreamObserverSupplier.apply(_deadlineMs); + _initialized.set(true); + // send a begin-of-stream message. + _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId) + .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build()); } private MailboxContent toMailboxContent(DataBlock dataBlock) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java index b5bcb9057b..c15546b25a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java @@ -19,21 +19,41 @@ package org.apache.pinot.query.mailbox; import com.google.common.base.Preconditions; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InMemoryMailboxService implements MailboxService<TransferableBlock> { - // channel manager + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryMailboxService.class); + private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5); private final String _hostname; private final int _mailboxPort; private final Consumer<MailboxIdentifier> _receivedMailContentCallback; - private final ConcurrentHashMap<String, ReceivingMailbox> _receivingMailbox = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<String, BlockingQueue> _mailboxQueue = new ConcurrentHashMap<>(); + private final Cache<String, InMemoryReceivingMailbox> _receivingMailboxCache = + CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES) + .removalListener(new RemovalListener<String, InMemoryReceivingMailbox>() { + @Override + public void onRemoval(RemovalNotification<String, InMemoryReceivingMailbox> notification) { + if (notification.wasEvicted()) { + LOGGER.info("Evicting dangling InMemoryReceivingMailbox: {}", notification.getKey()); + // TODO: This should be tied to the query deadline. Unlike GrpcMailboxService, the change here is + // simpler. + notification.getValue().cancel(); + } + } + }) + .build(); public InMemoryMailboxService(String hostname, int mailboxPort, Consumer<MailboxIdentifier> receivedMailContentCallback) { @@ -64,25 +84,32 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock> return _mailboxPort; } - public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) { + @Override + public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs) { Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport"); - String mId = mailboxId.toString(); - // for now, we use an unbounded blocking queue as the means of communication between - // in memory mailboxes - the reason for this is that unless we implement flow control, - // blocks will sit in memory either way (blocking the sender from sending doesn't prevent - // more blocks from being generated from upstream). on the other hand, having a capacity - // for the queue causes the sending thread to occupy a task pool thread and prevents other - // threads (most importantly, the receiving thread) from running - which can cause unnecessary - // failure situations - // TODO: when we implement flow control, we should swap this out with a bounded abstraction return new InMemorySendingMailbox(mailboxId.toString(), - _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>()), getReceivedMailContentCallback()); + () -> new InMemoryTransferStream(mailboxId, this, deadlineMs), + getReceivedMailContentCallback()); } + @Override public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) { Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport"); String mId = mailboxId.toString(); - BlockingQueue mailboxQueue = _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>()); - return _receivingMailbox.computeIfAbsent(mId, id -> new InMemoryReceivingMailbox(mId, mailboxQueue)); + try { + return _receivingMailboxCache.get(mId, () -> new InMemoryReceivingMailbox(mId)); + } catch (ExecutionException e) { + LOGGER.error(String.format("Error getting in-memory receiving mailbox=%s", mailboxId), e); + throw new RuntimeException(e); + } + } + + @Override + public void releaseReceivingMailbox(MailboxIdentifier mailboxId) { + InMemoryReceivingMailbox receivingMailbox = _receivingMailboxCache.getIfPresent(mailboxId.toString()); + if (receivingMailbox != null) { + receivingMailbox.cancel(); + _receivingMailboxCache.invalidate(mailboxId.toString()); + } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java index 43f32c61c5..57e6cce087 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java @@ -18,30 +18,30 @@ */ package org.apache.pinot.query.mailbox; -import java.util.concurrent.BlockingQueue; +import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream; import org.apache.pinot.query.runtime.blocks.TransferableBlock; public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> { private final String _mailboxId; - private final BlockingQueue<TransferableBlock> _queue; - private volatile boolean _closed; + private InMemoryTransferStream _transferStream; + private volatile boolean _closed = false; - public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) { + public InMemoryReceivingMailbox(String mailboxId) { _mailboxId = mailboxId; - _queue = queue; - _closed = false; } - @Override - public String getMailboxId() { - return _mailboxId; + public void init(InMemoryTransferStream transferStream) { + _transferStream = transferStream; } @Override public TransferableBlock receive() throws Exception { - TransferableBlock block = _queue.poll(); + if (_transferStream == null) { + return null; + } + TransferableBlock block = _transferStream.poll(); if (block == null) { return null; @@ -56,15 +56,23 @@ public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBl @Override public boolean isInitialized() { - return true; + return _transferStream != null; } @Override public boolean isClosed() { - return _closed && _queue.size() == 0; + return _closed; } @Override - public void cancel(Throwable e) { + public void cancel() { + if (_transferStream != null) { + _transferStream.cancel(); + } + } + + @Override + public String getMailboxId() { + return _mailboxId; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java index 18dcd8ffd3..035ee16f2b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java @@ -18,56 +18,61 @@ */ package org.apache.pinot.query.mailbox; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream; import org.apache.pinot.query.runtime.blocks.TransferableBlock; public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> { private final Consumer<MailboxIdentifier> _gotMailCallback; - private final String _mailboxId; + private final JsonMailboxIdentifier _mailboxId; - // TODO: changed to 2-way communication channel. - private BlockingQueue<TransferableBlock> _queue; + private Supplier<InMemoryTransferStream> _transferStreamProvider; + private InMemoryTransferStream _transferStream; - public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue, + public InMemorySendingMailbox(String mailboxId, Supplier<InMemoryTransferStream> transferStreamProvider, Consumer<MailboxIdentifier> gotMailCallback) { - _mailboxId = mailboxId; - _queue = queue; + _mailboxId = JsonMailboxIdentifier.parse(mailboxId); + _transferStreamProvider = transferStreamProvider; _gotMailCallback = gotMailCallback; } - @Override - public void open() { - } - @Override public String getMailboxId() { - return _mailboxId; + return _mailboxId.toString(); } @Override public void send(TransferableBlock data) - throws UnsupportedOperationException { - if (!_queue.offer(data)) { - // this should never happen, since we use a LinkedBlockingQueue - // which does not have capacity bounds - throw new IllegalStateException("Failed to insert into in-memory mailbox " + _mailboxId); + throws Exception { + if (!isInitialized()) { + initialize(); } - _gotMailCallback.accept(JsonMailboxIdentifier.parse(_mailboxId)); + _transferStream.send(data); + _gotMailCallback.accept(_mailboxId); } @Override - public void complete() { + public void complete() throws Exception { + _transferStream.complete(); } @Override - public void waitForFinish(long timeout, TimeUnit unit) - throws InterruptedException { + public boolean isInitialized() { + return _transferStream != null; } @Override public void cancel(Throwable t) { + if (isInitialized() && !_transferStream.isCancelled()) { + _transferStream.cancel(); + } + } + + private void initialize() { + if (_transferStream == null) { + _transferStream = _transferStreamProvider.get(); + } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java index 234dd78e98..b4d16b0a85 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java @@ -18,6 +18,10 @@ */ package org.apache.pinot.query.mailbox; +import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; +import org.apache.pinot.query.runtime.operator.OpChain; + + /** * Mailbox service that handles transfer for mailbox contents. * @@ -31,12 +35,12 @@ public interface MailboxService<T> { void start(); /** - * Shutting down the mailbox service.s + * Shutting down the mailbox service. */ void shutdown(); /** - * Get the host name on which this mailbox service is runnning on. + * Get the host name on which this mailbox service is running on. * * @return the host. */ @@ -50,10 +54,7 @@ public interface MailboxService<T> { int getMailboxPort(); /** - * Look up a receiving mailbox by {@link MailboxIdentifier}. - * - * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been - * initialized. + * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}. * * @param mailboxId mailbox identifier. * @return a receiving mailbox. @@ -61,10 +62,37 @@ public interface MailboxService<T> { ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId); /** - * Look up a sending mailbox by {@link MailboxIdentifier}. + * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is + * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the + * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it. * * @param mailboxId mailbox identifier. + * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline. * @return a sending mailbox. */ - SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId); + SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs); + + /** + * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered. + * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for + * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not + * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and + * the underlying resources: + * + * <ol> + * <li> + * If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case, + * {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main + * reason why this method exists. + * </li> + * <li> + * There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it + * is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a + * periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for + * the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up. + * </li> + * </ol> + * @param mailboxId + */ + void releaseReceivingMailbox(MailboxIdentifier mailboxId); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java index e80a65ce71..c4f9c6fdec 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java @@ -72,11 +72,20 @@ public class MultiplexingMailboxService implements MailboxService<TransferableBl } @Override - public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) { + public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs) { if (mailboxId.isLocal()) { - return _inMemoryMailboxService.getSendingMailbox(mailboxId); + return _inMemoryMailboxService.getSendingMailbox(mailboxId, deadlineMs); } - return _grpcMailboxService.getSendingMailbox(mailboxId); + return _grpcMailboxService.getSendingMailbox(mailboxId, deadlineMs); + } + + @Override + public void releaseReceivingMailbox(MailboxIdentifier mailboxId) { + if (mailboxId.isLocal()) { + _inMemoryMailboxService.releaseReceivingMailbox(mailboxId); + return; + } + _grpcMailboxService.releaseReceivingMailbox(mailboxId); } public static MultiplexingMailboxService newInstance(String hostname, int port, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java index 377430883c..98893aa5e8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java @@ -18,42 +18,58 @@ */ package org.apache.pinot.query.mailbox; +import javax.annotation.Nullable; +import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; +import org.apache.pinot.query.runtime.operator.MailboxSendOperator; + + /** - * Mailbox is used to send and receive data. + * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is with the MailboxService, which is unlike + * the {@link SendingMailbox} whose ownership lies with the {@link MailboxSendOperator}. This is because the + * ReceivingMailbox can be initialized even before the corresponding OpChain is registered on the receiver, whereas + * the SendingMailbox is initialized when the MailboxSendOperator is running. Also see {@link #isInitialized()}. * - * Mailbox should be instantiated on both side of MailboxServer. - * - * @param <T> type of data carried over the mailbox. + * @param <T> the unit of data that each {@link #receive()} call returns. */ public interface ReceivingMailbox<T> { /** * Get the unique identifier for the mailbox. - * - * @return Mailbox ID. */ String getMailboxId(); /** - * Receive a data packet from the mailbox. Depending on the implementation, this may return null. The caller should - * use {@link ReceivingMailbox#isClosed()} to determine if the sender is done sending and the channel is closed. - * @return data packet. - * @throws Exception + * Returns a unit of data. Implementations are allowed to return null, in which case {@link MailboxReceiveOperator} + * will assume that this mailbox doesn't have any data to return and it will instead poll the other mailbox (if any). */ - T receive() - throws Exception; + @Nullable + T receive() throws Exception; /** - * Check if receiving mailbox is initialized. - * @return + * A ReceivingMailbox is considered initialized when it has a reference to the underlying channel used for receiving + * the data. The underlying channel may be a gRPC stream, in-memory queue, etc. Once a receiving mailbox is + * initialized, it has the ability to close the underlying channel via the {@link #cancel()} method. */ boolean isInitialized(); /** - * Check if mailbox is closed. - * @return + * A ReceivingMailbox is considered closed if it has sent all the data to the receiver and doesn't have any more data + * to send. */ boolean isClosed(); - void cancel(Throwable e); + /** + * A ReceivingMailbox may hold a reference to the underlying channel. Usually the channel would be automatically + * closed once all the data has been received by the receiver, and in such cases {@link #isClosed()} returns true. + * However in failure scenarios the underlying channel may not be released, and the receiver can use this method to + * ensure the same. + * + * This API should ensure that the underlying channel is "released" if it hasn't been already. If the channel has + * already been released, the API shouldn't throw and instead return gracefully. + * + * <p> + * This method may be called multiple times, so implementations should ensure this is idempotent. + * </p> + */ + void cancel(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java index 6cc162b0ec..24651348ac 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java @@ -18,20 +18,15 @@ */ package org.apache.pinot.query.mailbox; -import java.util.concurrent.TimeUnit; +import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; /** - * Mailbox is used to send and receive data. + * Mailbox that's used to send data. * - * Mailbox should be instantiated on both side of MailboxServer. - * - * @param <T> type of data carried over the mailbox. + * @param <T> unit of data sent in one {@link #send} call. */ public interface SendingMailbox<T> { - - void open(); - /** * get the unique identifier for the mailbox. * @@ -40,19 +35,36 @@ public interface SendingMailbox<T> { String getMailboxId(); /** - * send a data packet through the mailbox. - * @param data - * @throws UnsupportedOperationException + * Send a single unit of data to a receiver. Note that SendingMailbox are required to acquire resources lazily in + * this call and they should <b>not</b> acquire any resources when they are created. This method should throw if there + * was an error sending the data, since that would allow {@link BlockExchange} to exit early. */ void send(T data) - throws UnsupportedOperationException; + throws Exception; /** - * Complete delivery of the current mailbox. + * Called when there is no more data to be sent by the {@link BlockExchange}. This is also a signal for the + * SendingMailbox that the sender is done sending data from its end. Note that this doesn't mean that the receiver + * has received all the data. + * + * <p> + * <b>Note:</b> While this is similar to a close() method that's usually provided with objects that hold releasable + * resources, the key difference is that a SendingMailbox cannot completely release the resources on its end + * gracefully, since it would be waiting for the receiver to ack that it has received all the data. See + * {@link #cancel} which can allow callers to force release the underlying resources. + * </p> */ - void complete(); + void complete() + throws Exception; - void waitForFinish(long timeout, TimeUnit unit) throws InterruptedException; + /** + * A SendingMailbox is considered initialized after it has acquired a reference to the underlying channel that will + * be used to send data to the receiver. + */ + boolean isInitialized(); + /** + * Allows terminating the underlying channel. + */ void cancel(Throwable t); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java new file mode 100644 index 0000000000..8bf65110be --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.mailbox.channel; + +import com.google.common.base.Preconditions; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import javax.annotation.Nullable; +import org.apache.pinot.query.mailbox.InMemoryMailboxService; +import org.apache.pinot.query.mailbox.InMemoryReceivingMailbox; +import org.apache.pinot.query.mailbox.MailboxIdentifier; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; + + +public class InMemoryTransferStream { + + private MailboxIdentifier _mailboxId; + private BlockingQueue<TransferableBlock> _queue; + private InMemoryMailboxService _mailboxService; + private final long _deadlineMs; + private boolean _receivingMailboxInitialized = false; + private boolean _isCancelled = false; + private boolean _isCompleted = false; + + public InMemoryTransferStream(MailboxIdentifier mailboxId, InMemoryMailboxService mailboxService, long deadlineMs) { + _mailboxId = mailboxId; + _queue = new LinkedBlockingQueue<>(); + _mailboxService = mailboxService; + _deadlineMs = deadlineMs; + } + + public void send(TransferableBlock block) { + Preconditions.checkState(!isCancelled(), "Tried to send on a cancelled InMemory stream"); + // TODO: Deadline check can be more efficient. + // While, in most cases the receiver would have anyways called cancel, for expensive queries it is possible that + // the receiver may have hung-up before it could get a reference to the stream. This can happen if the sending + // OpChain was running an expensive operation (like a large hash-join). + long currentTime = System.currentTimeMillis(); + Preconditions.checkState(currentTime < _deadlineMs, + String.format("Deadline exceeded by %s ms", currentTime - _deadlineMs)); + if (!_receivingMailboxInitialized) { + InMemoryReceivingMailbox receivingMailbox = + (InMemoryReceivingMailbox) _mailboxService.getReceivingMailbox(_mailboxId); + receivingMailbox.init(this); + _receivingMailboxInitialized = true; + } + _queue.offer(block); + } + + @Nullable + public TransferableBlock poll() + throws InterruptedException { + if (_isCancelled) { + return TransferableBlockUtils.getErrorTransferableBlock( + new RuntimeException("InMemoryTransferStream is cancelled")); + } else if (System.currentTimeMillis() > _deadlineMs) { + return TransferableBlockUtils.getErrorTransferableBlock( + new RuntimeException("Deadline reached for in-memory transfer stream")); + } + return _queue.poll(); + } + + public void complete() { + _isCompleted = true; + } + + public int size() { + return _queue.size(); + } + + public void cancel() { + _isCancelled = true; + // Eagerly lose references to the underlying data. + _queue.clear(); + } + + public boolean isCompleted() { + return _isCompleted; + } + + public boolean isCancelled() { + return _isCancelled; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java index 220e429caa..c8375d4eed 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java @@ -19,15 +19,15 @@ package org.apache.pinot.query.mailbox.channel; import com.google.protobuf.ByteString; +import io.grpc.Context; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; -import javax.annotation.concurrent.GuardedBy; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.query.mailbox.GrpcMailboxService; import org.apache.pinot.query.mailbox.GrpcReceivingMailbox; @@ -37,8 +37,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.lang.Math.max; - /** * {@code MailboxContentStreamObserver} is the content streaming observer used to receive mailbox content. @@ -48,59 +46,41 @@ import static java.lang.Math.max; * to the sender side. */ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> { + public static final int DEFAULT_MAX_PENDING_MAILBOX_CONTENT = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class); + private static final Mailbox.MailboxContent DEFAULT_ERROR_MAILBOX_CONTENT; + // This delta is added to the buffer offer on top of the query timeout to avoid a race between client cancellation + // due to deadline and server-side cancellation due to the receiving buffer being full. + private static final long BUFFER_OFFER_TIMEOUT_DELTA_MS = 1_000; - private static Mailbox.MailboxContent createErrorContent(Throwable e) - throws IOException { - return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom( - TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(e)).getDataBlock().toBytes())) - .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build(); + static { + try { + RuntimeException exception = new RuntimeException( + "Error creating error-content.. please file a bug in the apache/pinot repo"); + DEFAULT_ERROR_MAILBOX_CONTENT = Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom( + TransferableBlockUtils.getErrorTransferableBlock(exception).getDataBlock().toBytes())) + .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build(); + } catch (IOException e) { + throw new RuntimeException(e); + } } private final GrpcMailboxService _mailboxService; private final StreamObserver<Mailbox.MailboxStatus> _responseObserver; - private final boolean _isEnabledFeedback; private final AtomicBoolean _isCompleted = new AtomicBoolean(false); private final BlockingQueue<Mailbox.MailboxContent> _receivingBuffer; - private ReadWriteLock _bufferSizeLock = new ReentrantReadWriteLock(); - @GuardedBy("bufferSizeLock") - private int _maxBufferSize = 0; - private ReadWriteLock _errorLock = new ReentrantReadWriteLock(); - @GuardedBy("_errorLock") private Mailbox.MailboxContent _errorContent = null; private JsonMailboxIdentifier _mailboxId; private Consumer<MailboxIdentifier> _gotMailCallback; - private void updateMaxBufferSize() { - _bufferSizeLock.writeLock().lock(); - _maxBufferSize = max(_maxBufferSize, _receivingBuffer.size()); - _bufferSizeLock.writeLock().unlock(); - } - - private int getMaxBufferSize() { - try { - _bufferSizeLock.readLock().lock(); - return _maxBufferSize; - } finally { - _bufferSizeLock.readLock().unlock(); - } - } - public MailboxContentStreamObserver(GrpcMailboxService mailboxService, StreamObserver<Mailbox.MailboxStatus> responseObserver) { - this(mailboxService, responseObserver, false); - } - - public MailboxContentStreamObserver(GrpcMailboxService mailboxService, - StreamObserver<Mailbox.MailboxStatus> responseObserver, boolean isEnabledFeedback) { _mailboxService = mailboxService; _responseObserver = responseObserver; - // TODO: Replace unbounded queue with bounded queue when we have backpressure in place. - // It is possible this will create high memory pressure since we have memory leak issues. - _receivingBuffer = new LinkedBlockingQueue(); - _isEnabledFeedback = isEnabledFeedback; + _receivingBuffer = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_MAILBOX_CONTENT); } /** @@ -110,92 +90,82 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail * to indicate when to call this method. */ public Mailbox.MailboxContent poll() { - try { - _errorLock.readLock().lock(); - if (_errorContent != null) { - return _errorContent; - } - } finally { - _errorLock.readLock().unlock(); + if (_errorContent != null) { + return _errorContent; } - if (isCompleted()) { + if (hasConsumedAllData()) { return null; } return _receivingBuffer.poll(); } - public boolean isCompleted() { - return _isCompleted.get() && _receivingBuffer.isEmpty(); - } - @Override public void onNext(Mailbox.MailboxContent mailboxContent) { _mailboxId = JsonMailboxIdentifier.parse(mailboxContent.getMailboxId()); + long remainingTimeMs = Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS); GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox) _mailboxService.getReceivingMailbox(_mailboxId); - _gotMailCallback = receivingMailbox.init(this); + _gotMailCallback = receivingMailbox.init(this, _responseObserver); + if (_errorContent != null) { + // This should never happen because gRPC calls StreamObserver in a single-threaded fashion, and if error-content + // is not null then we would have already issued a onError which means gRPC will not call onNext again. + LOGGER.warn("onNext called even though already errored out"); + return; + } if (!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY)) { // when the receiving end receives a message put it in the mailbox queue. - // TODO: pass a timeout to _receivingBuffer. - if (!_receivingBuffer.offer(mailboxContent)) { - // TODO: close the stream. - RuntimeException e = new RuntimeException("Mailbox receivingBuffer is full:" + _mailboxId); - LOGGER.error(e.getMessage()); - try { - _errorLock.writeLock().lock(); - _errorContent = createErrorContent(e); - } catch (IOException ioe) { - e = new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe); - LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", _mailboxId); + try { + final long offerTimeoutMs = remainingTimeMs + BUFFER_OFFER_TIMEOUT_DELTA_MS; + if (!_receivingBuffer.offer(mailboxContent, offerTimeoutMs, TimeUnit.MILLISECONDS)) { + RuntimeException e = new RuntimeException("Timed out offering to the receivingBuffer: " + _mailboxId); LOGGER.error(e.getMessage()); - throw e; - } finally { - _errorLock.writeLock().unlock(); + _errorContent = createErrorContent(e); + try { + _responseObserver.onError(Status.CANCELLED.asRuntimeException()); + } catch (Exception ignored) { + // Exception can be thrown if the stream deadline has already been reached, so we simply ignore it. + } } + } catch (InterruptedException e) { + _errorContent = createErrorContent(e); + LOGGER.error("Interrupted while polling receivingBuffer", e); + _responseObserver.onError(Status.CANCELLED.asRuntimeException()); } _gotMailCallback.accept(_mailboxId); - - updateMaxBufferSize(); - - if (_isEnabledFeedback) { - // TODO: this has race conditions with onCompleted() because sender blindly closes connection channels once - // it has finished sending all the data packets. - int remainingCapacity = _receivingBuffer.remainingCapacity() - 1; - Mailbox.MailboxStatus.Builder builder = - Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId()) - .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, String.valueOf(remainingCapacity)); - if (mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY) != null) { - builder.putAllMetadata(mailboxContent.getMetadataMap()); - builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true"); - } - Mailbox.MailboxStatus status = builder.build(); - // returns the buffer available size to sender for rate controller / throttling. - _responseObserver.onNext(status); - } } } @Override public void onError(Throwable e) { - try { - _errorLock.writeLock().lock(); + if (_errorContent == null) { _errorContent = createErrorContent(e); - _gotMailCallback.accept(_mailboxId); - throw new RuntimeException(e); - } catch (IOException ioe) { - throw new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe); - } finally { - _errorLock.writeLock().unlock(); - LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", _mailboxId); } + _gotMailCallback.accept(_mailboxId); } @Override public void onCompleted() { _isCompleted.set(true); _responseObserver.onCompleted(); - LOGGER.debug("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", _mailboxId); + } + + /** + * @return true if all data has been received via {@link #poll()}. + */ + public boolean hasConsumedAllData() { + return _isCompleted.get() && _receivingBuffer.isEmpty(); + } + + private static Mailbox.MailboxContent createErrorContent(Throwable e) { + try { + return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom( + TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(e)).getDataBlock().toBytes())) + .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build(); + } catch (IOException ioException) { + LOGGER.error("Error creating error MailboxContent", ioException); + return DEFAULT_ERROR_MAILBOX_CONTENT; + } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java index fd7443db12..291a38325b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java @@ -38,10 +38,9 @@ public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.Mailb private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5; private final AtomicInteger _bufferSize = new AtomicInteger(5); - private CountDownLatch _finishLatch; + private final CountDownLatch _finishLatch = new CountDownLatch(1); - public MailboxStatusStreamObserver(CountDownLatch finishLatch) { - _finishLatch = finishLatch; + public MailboxStatusStreamObserver() { } @Override @@ -60,12 +59,15 @@ public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.Mailb @Override public void onError(Throwable e) { _finishLatch.countDown(); - LOGGER.error("Receiving error msg from grpc mailbox status stream:", e); - throw new RuntimeException(e); + LOGGER.error("[mailbox] Server returned onError", e); } @Override public void onCompleted() { _finishLatch.countDown(); } + + public boolean isFinished() { + return _finishLatch.getCount() == 0; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index a1d22c678f..f7967322a0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -161,13 +161,38 @@ public class QueryRunner { public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) { long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); + long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)); + long deadlineMs = System.currentTimeMillis() + timeoutMs; if (isLeafStage(distributedStagePlan)) { - // TODO: make server query request return via mailbox, this is a hack to gather the non-streaming data table - // and package it here for return. But we should really use a MailboxSendOperator directly put into the - // server executor. + runLeafStage(distributedStagePlan, requestMetadataMap, deadlineMs, requestId); + } else { + StageNode stageRoot = distributedStagePlan.getStageRoot(); + OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, + new PlanRequestContext(_mailboxService, requestId, stageRoot.getStageId(), timeoutMs, deadlineMs, + new VirtualServerAddress(distributedStagePlan.getServer()), distributedStagePlan.getMetadataMap())); + _scheduler.register(rootOperator); + } + } + + public ExecutorService getQueryWorkerExecutorService() { + return _queryWorkerExecutorService; + } + + public ExecutorService getQueryRunnerExecutorService() { + return _queryRunnerExecutorService; + } + + private void runLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap, + long deadlineMs, long requestId) { + // TODO: make server query request return via mailbox, this is a hack to gather the non-streaming data table + // and package it here for return. But we should really use a MailboxSendOperator directly put into the + // server executor. + MailboxSendOperator mailboxSendOperator = null; + try { long leafStageStartMillis = System.currentTimeMillis(); List<ServerPlanRequestContext> serverQueryRequests = - constructServerQueryRequests(distributedStagePlan, requestMetadataMap, _helixPropertyStore, _mailboxService); + constructServerQueryRequests(distributedStagePlan, requestMetadataMap, _helixPropertyStore, _mailboxService, + deadlineMs); // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key) List<InstanceResponseBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size()); @@ -181,37 +206,27 @@ public class QueryRunner { + (System.currentTimeMillis() - leafStageStartMillis) + " ms"); MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId()); - MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(_mailboxService, + mailboxSendOperator = new MailboxSendOperator(_mailboxService, new LeafStageTransferableBlockOperator(serverQueryResults, sendNode.getDataSchema(), requestId, sendNode.getStageId(), _rootServer), receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _rootServer, requestId, - sendNode.getStageId(), sendNode.getReceiverStageId()); + sendNode.getStageId(), sendNode.getReceiverStageId(), deadlineMs); int blockCounter = 0; while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) { LOGGER.debug("Acquired transferable block: {}", blockCounter++); } - mailboxSendOperator.toExplainString(); - } else { - long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)); - StageNode stageRoot = distributedStagePlan.getStageRoot(); - OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, - new PlanRequestContext(_mailboxService, requestId, stageRoot.getStageId(), timeoutMs, - new VirtualServerAddress(distributedStagePlan.getServer()), distributedStagePlan.getMetadataMap())); - _scheduler.register(rootOperator); + mailboxSendOperator.close(); + } catch (Exception e) { + LOGGER.error(String.format("Error running leafStage for requestId=%s", requestId), e); + if (mailboxSendOperator != null) { + mailboxSendOperator.cancel(e); + } } } - public ExecutorService getQueryWorkerExecutorService() { - return _queryWorkerExecutorService; - } - - public ExecutorService getQueryRunnerExecutorService() { - return _queryRunnerExecutorService; - } - private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore, - MailboxService<TransferableBlock> mailboxService) { + MailboxService<TransferableBlock> mailboxService, long deadlineMs) { StageMetadata stageMetadata = distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId()); Preconditions.checkState(stageMetadata.getScannedTables().size() == 1, "Server request for V2 engine should only have 1 scan table per request."); @@ -231,7 +246,7 @@ public class QueryRunner { TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); requests.add( ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, tableConfig, - schema, stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, tableEntry.getValue())); + schema, stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, tableEntry.getValue(), deadlineMs)); } else if (TableType.REALTIME.name().equals(tableType)) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); @@ -239,7 +254,7 @@ public class QueryRunner { TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); requests.add( ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, tableConfig, - schema, stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, tableEntry.getValue())); + schema, stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, tableEntry.getValue(), deadlineMs)); } else { throw new IllegalArgumentException("Unsupported table type key: " + tableType); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java index 15ae722142..5e4260368b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java @@ -68,6 +68,7 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { @Override public void runJob() { boolean isFinished = false; + boolean returnedErrorBlock = false; Throwable thrown = null; try { LOGGER.trace("({}): Executing", operatorChain); @@ -88,6 +89,7 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { } else { isFinished = true; if (result.isErrorBlock()) { + returnedErrorBlock = true; LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(), result.getDataBlock().getExceptions()); } else { @@ -99,11 +101,10 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService { LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e); thrown = e; } finally { - if (isFinished) { - closeOpChain(operatorChain); - } else if (thrown != null) { - // TODO: It would make sense to cancel OpChains if they returned an error-block. + if (returnedErrorBlock || thrown != null) { cancelOpChain(operatorChain, thrown); + } else if (isFinished) { + closeOpChain(operatorChain); } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index 0b16a47229..74bd60e10d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -186,4 +186,20 @@ public class MailboxReceiveOperator extends MultiStageOperator { : TransferableBlockUtils.getEndOfStreamTransferableBlock(); return block; } + + @Override + public void close() { + super.close(); + for (MailboxIdentifier sendingMailbox : _sendingMailbox) { + _mailboxService.releaseReceivingMailbox(sendingMailbox); + } + } + + @Override + public void cancel(Throwable t) { + super.cancel(t); + for (MailboxIdentifier sendingMailbox : _sendingMailbox) { + _mailboxService.releaseReceivingMailbox(sendingMailbox); + } + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 504a0d997b..f4e7976527 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -59,7 +59,8 @@ public class MailboxSendOperator extends MultiStageOperator { @VisibleForTesting interface BlockExchangeFactory { BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations, - RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector, BlockSplitter splitter); + RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector, BlockSplitter splitter, + long deadlineMs); } @VisibleForTesting @@ -70,10 +71,10 @@ public class MailboxSendOperator extends MultiStageOperator { public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> receivingStageInstances, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, - VirtualServerAddress sendingServer, long jobId, int senderStageId, int receiverStageId) { + VirtualServerAddress sendingServer, long jobId, int senderStageId, int receiverStageId, long deadlineMs) { this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector, server -> toMailboxId(server, jobId, senderStageId, receiverStageId, sendingServer), BlockExchange::getExchange, - jobId, senderStageId, receiverStageId, sendingServer); + jobId, senderStageId, receiverStageId, sendingServer, deadlineMs); } @VisibleForTesting @@ -81,7 +82,7 @@ public class MailboxSendOperator extends MultiStageOperator { MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> receivingStageInstances, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, long jobId, int senderStageId, - int receiverStageId, VirtualServerAddress serverAddress) { + int receiverStageId, VirtualServerAddress serverAddress, long deadlineMs) { super(jobId, senderStageId, serverAddress); _dataTableBlockBaseOperator = dataTableBlockBaseOperator; @@ -111,7 +112,8 @@ public class MailboxSendOperator extends MultiStageOperator { } BlockSplitter splitter = TransferableBlockUtils::splitBlock; - _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes, exchangeType, keySelector, splitter); + _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes, exchangeType, keySelector, splitter, + deadlineMs); Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType), String.format("Exchange type '%s' is not supported yet", exchangeType)); @@ -141,9 +143,6 @@ public class MailboxSendOperator extends MultiStageOperator { transferableBlock = _dataTableBlockBaseOperator.nextBlock(); } } catch (final Exception e) { - // ideally, MailboxSendOperator doesn't ever throw an exception because - // it will just get swallowed, in this scenario at least we can forward - // any upstream exceptions as an error block transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e); try { _exchange.send(transferableBlock); @@ -154,6 +153,18 @@ public class MailboxSendOperator extends MultiStageOperator { return transferableBlock; } + @Override + public void close() { + super.close(); + _exchange.close(); + } + + @Override + public void cancel(Throwable t) { + super.cancel(t); + _exchange.cancel(t); + } + private static JsonMailboxIdentifier toMailboxId( VirtualServer destination, long jobId, int senderStageId, int receiverStageId, VirtualServerAddress sender) { return new JsonMailboxIdentifier( diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java index e6f131b33e..03d9ee2d23 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java @@ -29,6 +29,8 @@ import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -36,6 +38,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; * exchanging data across different servers. */ public abstract class BlockExchange { + private static final Logger LOGGER = LoggerFactory.getLogger(BlockExchange.class); // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override. // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024; @@ -44,10 +47,10 @@ public abstract class BlockExchange { public static BlockExchange getExchange(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> selector, - BlockSplitter splitter) { + BlockSplitter splitter, long deadlineMs) { List<SendingMailbox<TransferableBlock>> sendingMailboxes = new ArrayList<>(); for (MailboxIdentifier mid : destinations) { - sendingMailboxes.add(mailboxService.getSendingMailbox(mid)); + sendingMailboxes.add(mailboxService.getSendingMailbox(mid, deadlineMs)); } switch (exchangeType) { case SINGLETON: @@ -71,15 +74,19 @@ public abstract class BlockExchange { _splitter = splitter; } - public void send(TransferableBlock block) { + public void send(TransferableBlock block) + throws Exception { if (block.isEndOfStreamBlock()) { - _sendingMailboxes.forEach(destination -> sendBlock(destination, block)); + for (SendingMailbox<TransferableBlock> sendingMailbox : _sendingMailboxes) { + sendBlock(sendingMailbox, block); + } return; } route(_sendingMailboxes, block); } - protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, TransferableBlock block) { + protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, TransferableBlock block) + throws Exception { if (block.isEndOfStreamBlock()) { sendingMailbox.send(block); sendingMailbox.complete(); @@ -93,5 +100,17 @@ public abstract class BlockExchange { } } - protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block); + protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) + throws Exception; + + // Called when the OpChain gracefully returns. + // TODO: This is a no-op right now. + public void close() { + } + + public void cancel(Throwable t) { + for (SendingMailbox<TransferableBlock> sendingMailbox : _sendingMailboxes) { + sendingMailbox.cancel(t); + } + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java index 932f7593b6..e9c44d7502 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java @@ -34,7 +34,8 @@ class BroadcastExchange extends BlockExchange { } @Override - protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) { + protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) + throws Exception { for (SendingMailbox mailbox : destinations) { sendBlock(mailbox, block); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java index 99f0e04f91..b7d5a4a15c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java @@ -43,7 +43,8 @@ class HashExchange extends BlockExchange { } @Override - protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) { + protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) + throws Exception { List<Object[]>[] destIdxToRows = new List[destinations.size()]; for (Object[] row : block.getContainer()) { int partition = _keySelector.computeHash(row) % destinations.size(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java index 0073e28bf0..dc015378ca 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java @@ -48,7 +48,8 @@ class RandomExchange extends BlockExchange { } @Override - protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) { + protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) + throws Exception { int destinationIdx = _rand.apply(destinations.size()); sendBlock(destinations.get(destinationIdx), block); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java index 713cfa5ba2..ebe9eb370e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java @@ -35,7 +35,8 @@ class SingletonExchange extends BlockExchange { } @Override - protected void route(List<SendingMailbox<TransferableBlock>> mailbox, TransferableBlock block) { + protected void route(List<SendingMailbox<TransferableBlock>> mailbox, TransferableBlock block) + throws Exception { for (SendingMailbox sendingMailbox : mailbox) { sendBlock(sendingMailbox, block); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index dec70bd4e2..8ff1c041d1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -80,7 +80,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator, StageMetadata receivingStageMetadata = context.getMetadataMap().get(node.getReceiverStageId()); return new MailboxSendOperator(context.getMailboxService(), nextOperator, receivingStageMetadata.getServerInstances(), node.getExchangeType(), node.getPartitionKeySelector(), - context.getServer(), context.getRequestId(), node.getStageId(), node.getReceiverStageId()); + context.getServer(), context.getRequestId(), node.getStageId(), node.getReceiverStageId(), + context.getDeadlineMs()); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java index db5b0e028a..dac0f97575 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java @@ -33,18 +33,21 @@ public class PlanRequestContext { protected final MailboxService<TransferableBlock> _mailboxService; protected final long _requestId; protected final int _stageId; + // TODO: Timeout is not needed since deadline is already present. private final long _timeoutMs; + private final long _deadlineMs; protected final VirtualServerAddress _server; protected final Map<Integer, StageMetadata> _metadataMap; protected final List<MailboxIdentifier> _receivingMailboxes = new ArrayList<>(); public PlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId, - long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap) { + long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap) { _mailboxService = mailboxService; _requestId = requestId; _stageId = stageId; _timeoutMs = timeoutMs; + _deadlineMs = deadlineMs; _server = server; _metadataMap = metadataMap; } @@ -61,6 +64,10 @@ public class PlanRequestContext { return _timeoutMs; } + public long getDeadlineMs() { + return _deadlineMs; + } + public VirtualServerAddress getServer() { return _server; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java index 55141cc739..b6194bd15b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java @@ -91,7 +91,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl public static ServerPlanRequestContext build(MailboxService<TransferableBlock> mailboxService, DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, - TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) { + TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList, long deadlineMs) { // Before-visit: construct the ServerPlanRequestContext baseline // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing. long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16) @@ -107,7 +107,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit); pinotQuery.setExplain(false); ServerPlanRequestContext context = - new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, + new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs, new VirtualServerAddress(stagePlan.getServer()), stagePlan.getMetadataMap(), pinotQuery, tableType, timeBoundaryInfo); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java index 35142fe0cf..4403d35e6d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java @@ -42,9 +42,9 @@ public class ServerPlanRequestContext extends PlanRequestContext { protected InstanceRequest _instanceRequest; public ServerPlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId, - long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap, PinotQuery pinotQuery, - TableType tableType, TimeBoundaryInfo timeBoundaryInfo) { - super(mailboxService, requestId, stageId, timeoutMs, server, metadataMap); + long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap, + PinotQuery pinotQuery, TableType tableType, TimeBoundaryInfo timeBoundaryInfo) { + super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, metadataMap); _pinotQuery = pinotQuery; _tableType = tableType; _timeBoundaryInfo = timeBoundaryInfo; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java index 9b7d0426de..7d08439601 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java @@ -21,12 +21,16 @@ package org.apache.pinot.query.mailbox; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import org.apache.commons.collections.MapUtils; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.MetadataBlock; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.service.QueryConfig; @@ -76,13 +80,10 @@ public class GrpcMailboxServiceTest { @Test(timeOut = 10_000L) public void testHappyPath() throws Exception { + final long deadlineMs = System.currentTimeMillis() + 10_000; // Given: - JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier( - "happypath", - new VirtualServerAddress("localhost", _mailboxService1.getMailboxPort(), 0), - new VirtualServerAddress("localhost", _mailboxService2.getMailboxPort(), 0), - DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID); - SendingMailbox<TransferableBlock> sendingMailbox = _mailboxService1.getSendingMailbox(mailboxId); + JsonMailboxIdentifier mailboxId = createMailboxId("happypath"); + SendingMailbox<TransferableBlock> sendingMailbox = _mailboxService1.getSendingMailbox(mailboxId, deadlineMs); ReceivingMailbox<TransferableBlock> receivingMailbox = _mailboxService2.getReceivingMailbox(mailboxId); CountDownLatch gotData = new CountDownLatch(1); _mail2GotData.set(ignored -> gotData.countDown()); @@ -109,14 +110,10 @@ public class GrpcMailboxServiceTest { @Test(timeOut = 10_000L) public void testGrpcException() throws Exception { + final long deadlineMs = System.currentTimeMillis() + 10_000; // Given: - JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier( - "exception", - new VirtualServerAddress("localhost", _mailboxService1.getMailboxPort(), 0), - new VirtualServerAddress("localhost", _mailboxService2.getMailboxPort(), 0), - DEFAULT_SENDER_STAGE_ID, - DEFAULT_RECEIVER_STAGE_ID); - SendingMailbox<TransferableBlock> sendingMailbox = _mailboxService1.getSendingMailbox(mailboxId); + JsonMailboxIdentifier mailboxId = createMailboxId("exception"); + SendingMailbox<TransferableBlock> sendingMailbox = _mailboxService1.getSendingMailbox(mailboxId, deadlineMs); ReceivingMailbox<TransferableBlock> receivingMailbox = _mailboxService2.getReceivingMailbox(mailboxId); CountDownLatch gotData = new CountDownLatch(1); _mail2GotData.set(ignored -> gotData.countDown()); @@ -134,6 +131,207 @@ public class GrpcMailboxServiceTest { Assert.assertFalse(receivedDataBlock.getExceptions().isEmpty()); } + /** + * When the connection reaches deadline before the EOS block could be sent, the receiving mailbox should return a + * error block. + */ + @Test + public void testGrpcStreamDeadline() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 1_000; + JsonMailboxIdentifier mailboxId = createMailboxId("conndeadline"); + + GrpcSendingMailbox grpcSendingMailbox = + (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, deadlineMs); + GrpcReceivingMailbox grpcReceivingMailbox = + (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId); + + CountDownLatch latch = new CountDownLatch(2); + Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() { + @Override + public void accept(MailboxIdentifier mailboxIdentifier) { + latch.countDown(); + } + }; + _mail2GotData.set(callback); + + // Send 1 normal block. + grpcSendingMailbox.send(getTestTransferableBlock()); + + // Latch had started with count=2. We don't send any EOS block and instead wait for connection deadline to + // trigger the next callback. The latch won't await the full wait timeout and instead should return immediately + // as soon as the deadline is hit and MailboxContentStreamObserver#onError is called. + Assert.assertTrue(latch.await(4_000, TimeUnit.SECONDS)); + + // In case of errors, MailboxContentStreamObserver short-circuits and skips returning the normal data-block. + TransferableBlock receivedBlock = grpcReceivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertTrue(receivedBlock.isErrorBlock()); + Map<Integer, String> exceptions = receivedBlock.getDataBlock().getExceptions(); + Assert.assertTrue(MapUtils.isNotEmpty(exceptions)); + String exceptionMessage = exceptions.values().iterator().next(); + Assert.assertTrue(exceptionMessage.contains("CANCELLED")); + + // GrpcReceivingMailbox#cancel shouldn't throw and instead silently swallow exception + grpcReceivingMailbox.cancel(); + } + + /** + * This test ensures that when the buffer in MailboxContentStreamObserver is full: + * + * 1. The sender is not blocked and can complete successfully. + * 2. The gotMail callback is called (bufferSize + 1) times. + * 3. The offer to the buffer in MailboxContentStreamObserver times out around the time the query deadline is reached. + * 4. A error-block is returned by a subsequent {@link GrpcReceivingMailbox#receive()} call. + */ + @Test + public void testMailboxContentStreamBufferFull() + throws Exception { + final int bufferSize = MailboxContentStreamObserver.DEFAULT_MAX_PENDING_MAILBOX_CONTENT; + long queryTimeoutMs = 2_000; + long deadlineMs = System.currentTimeMillis() + queryTimeoutMs; + int blocksSent = 20; + JsonMailboxIdentifier mailboxId = createMailboxId("buffer-full"); + + GrpcSendingMailbox grpcSendingMailbox = + (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, deadlineMs); + GrpcReceivingMailbox grpcReceivingMailbox = + (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId); + + CountDownLatch bufferSizeLatch = new CountDownLatch(bufferSize); + CountDownLatch bufferSizePlusOneLatch = new CountDownLatch(bufferSize + 1); + CountDownLatch bufferSizePlusTwoLatch = new CountDownLatch(bufferSize + 2); + CountDownLatch bufferSizePlusThreeLatch = new CountDownLatch(bufferSize + 3); + Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() { + @Override + public void accept(MailboxIdentifier mailboxIdentifier) { + bufferSizeLatch.countDown(); + bufferSizePlusOneLatch.countDown(); + bufferSizePlusTwoLatch.countDown(); + bufferSizePlusThreeLatch.countDown(); + } + }; + _mail2GotData.set(callback); + + // Sending mailbox will not be blocked if receiver buffer is full + for (int i = 0; i < blocksSent; i++) { + grpcSendingMailbox.send(getTestTransferableBlock()); + } + grpcSendingMailbox.complete(); + + // Ensure that the buffer is completely filled + Assert.assertTrue(bufferSizeLatch.await(1, TimeUnit.SECONDS)); + // Wait for the buffer offer to fail. After it fails, gotMail callback will be called once more in onNext + Assert.assertTrue(bufferSizePlusOneLatch.await(queryTimeoutMs + 1_000, TimeUnit.MILLISECONDS)); + // Since buffer offer fails after the stream deadline has already been reached, + // MailboxContentStreamObserver#onError will be called + Assert.assertTrue(bufferSizePlusTwoLatch.await(1, TimeUnit.SECONDS)); + // gotMail callback will be called (bufferSize + 1) times from onNext and once from onError, for a total of + // (bufferSize + 2) calls. The following latch await ensures that the callback is never called more than that. + Assert.assertFalse(bufferSizePlusThreeLatch.await(1, TimeUnit.SECONDS)); + + // Ensure that a error-block is returned by the receiving mailbox. + TransferableBlock receivedBlock = grpcReceivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertTrue(receivedBlock.isErrorBlock()); + Map<Integer, String> exceptions = receivedBlock.getDataBlock().getExceptions(); + Assert.assertTrue(exceptions.size() > 0); + Assert.assertTrue(exceptions.values().iterator().next().contains("Timed out offering to the receivingBuffer")); + } + + /** + * This test ensures that when a stream is cancelled by the receiver, any future sends by the sender will throw. + */ + @Test + public void testStreamCancellationByReceiver() + throws Exception { + // set a large deadline + long deadlineMs = System.currentTimeMillis() + 120_000; + JsonMailboxIdentifier mailboxId = createMailboxId("recv-cancel"); + + GrpcSendingMailbox grpcSendingMailbox = + (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, deadlineMs); + GrpcReceivingMailbox grpcReceivingMailbox = + (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId); + + CountDownLatch receivedDataLatch = new CountDownLatch(1); + Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() { + @Override + public void accept(MailboxIdentifier mailboxIdentifier) { + receivedDataLatch.countDown(); + } + }; + _mail2GotData.set(callback); + + // Send and receive 1 data block to ensure stream is established + grpcSendingMailbox.send(getTestTransferableBlock()); + Assert.assertTrue(receivedDataLatch.await(1, TimeUnit.SECONDS)); + TransferableBlock receivedBlock = grpcReceivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertEquals(receivedBlock.getNumRows(), 1); + + // Receiver issues a cancellation + grpcReceivingMailbox.cancel(); + + // Send from sender will now throw. We await a few milliseconds since cancellation may have a lag in getting + // processed at the other side. + CountDownLatch neverEndingLatch = new CountDownLatch(1); + try { + Assert.assertFalse(neverEndingLatch.await(100, TimeUnit.MILLISECONDS)); + grpcSendingMailbox.send(getTestTransferableBlock()); + Assert.fail("Send call above should have thrown since the stream is cancelled"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Called send when stream is already closed")); + } + } + + @Test + public void testStreamCancellationBySender() + throws Exception { + // set a large deadline + long deadlineMs = System.currentTimeMillis() + 120_000; + JsonMailboxIdentifier mailboxId = createMailboxId("sender-cancel"); + + GrpcSendingMailbox grpcSendingMailbox = + (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, deadlineMs); + GrpcReceivingMailbox grpcReceivingMailbox = + (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId); + + CountDownLatch receivedDataLatch = new CountDownLatch(1); + Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() { + @Override + public void accept(MailboxIdentifier mailboxIdentifier) { + receivedDataLatch.countDown(); + } + }; + _mail2GotData.set(callback); + + // Send and receive 1 data block to ensure stream is established + grpcSendingMailbox.send(getTestTransferableBlock()); + Assert.assertTrue(receivedDataLatch.await(1, TimeUnit.SECONDS)); + TransferableBlock receivedBlock = grpcReceivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertEquals(receivedBlock.getNumRows(), 1); + + // Sender issues a cancellation + grpcSendingMailbox.cancel(new RuntimeException("foo")); + + // receiving mailbox should return a error-block + CountDownLatch neverEndingLatch = new CountDownLatch(1); + Assert.assertFalse(neverEndingLatch.await(100, TimeUnit.MILLISECONDS)); + receivedBlock = grpcReceivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertTrue(receivedBlock.isErrorBlock()); + } + + private JsonMailboxIdentifier createMailboxId(String jobId) { + return new JsonMailboxIdentifier( + jobId, + new VirtualServerAddress("localhost", _mailboxService1.getMailboxPort(), 0), + new VirtualServerAddress("localhost", _mailboxService2.getMailboxPort(), 0), + DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID); + } + private TransferableBlock getTestTransferableBlock() { List<Object[]> rows = new ArrayList<>(); rows.add(createRow(0, "test_string")); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java index 0c1351fd04..78edcec092 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java @@ -20,11 +20,13 @@ package org.apache.pinot.query.mailbox; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.pinot.common.datablock.DataBlock; -import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -33,6 +35,12 @@ public class InMemoryMailboxServiceTest { private static final int DEFAULT_SENDER_STAGE_ID = 0; private static final int DEFAULT_RECEIVER_STAGE_ID = 1; + private static final JsonMailboxIdentifier MAILBOX_ID = new JsonMailboxIdentifier( + String.format("%s_%s", 1234, DEFAULT_RECEIVER_STAGE_ID), + new VirtualServerAddress("localhost", 0, 0), + new VirtualServerAddress("localhost", 0, 0), + DEFAULT_SENDER_STAGE_ID, + DEFAULT_RECEIVER_STAGE_ID); private static final DataSchema TEST_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); private static final int NUM_ENTRIES = 5; @@ -40,13 +48,12 @@ public class InMemoryMailboxServiceTest { @Test public void testHappyPath() throws Exception { + long deadlineMs = System.currentTimeMillis() + 10_000; InMemoryMailboxService mailboxService = new InMemoryMailboxService("localhost", 0, ignored -> { }); - final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier( - "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new VirtualServerAddress("localhost", 0, 0), - DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID); InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox) mailboxService.getReceivingMailbox( - mailboxId); - InMemorySendingMailbox sendingMailbox = (InMemorySendingMailbox) mailboxService.getSendingMailbox(mailboxId); + MAILBOX_ID); + InMemorySendingMailbox sendingMailbox = + (InMemorySendingMailbox) mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs); // Sends are non-blocking as long as channel capacity is not breached for (int i = 0; i < NUM_ENTRIES; i++) { @@ -77,14 +84,15 @@ public class InMemoryMailboxServiceTest { */ @Test public void testNonLocalMailboxId() { + long deadlineMs = System.currentTimeMillis() + 10_000; InMemoryMailboxService mailboxService = new InMemoryMailboxService("localhost", 0, ignored -> { }); - final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier( + final JsonMailboxIdentifier nonLocalMailboxId = new JsonMailboxIdentifier( "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new VirtualServerAddress("localhost", 1, 0), DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID); // Test getReceivingMailbox try { - mailboxService.getReceivingMailbox(mailboxId); + mailboxService.getReceivingMailbox(nonLocalMailboxId); Assert.fail("Method call above should have failed"); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("non-local transport")); @@ -92,16 +100,109 @@ public class InMemoryMailboxServiceTest { // Test getSendingMailbox try { - mailboxService.getSendingMailbox(mailboxId); + mailboxService.getSendingMailbox(nonLocalMailboxId, deadlineMs); Assert.fail("Method call above should have failed"); } catch (IllegalStateException e) { Assert.assertTrue(e.getMessage().contains("non-local transport")); } } + @Test + public void testInMemoryStreamCancellationByReceiver() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 10_000; + InMemoryMailboxService mailboxService = new InMemoryMailboxService("localhost", 0, ignored -> { }); + + SendingMailbox<TransferableBlock> sendingMailbox = mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs); + ReceivingMailbox<TransferableBlock> receivingMailbox = mailboxService.getReceivingMailbox(MAILBOX_ID); + + // Send and receive one data block + sendingMailbox.send(getTestTransferableBlock(0, false)); + TransferableBlock receivedBlock = receivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertEquals(receivedBlock.getNumRows(), 1); + + receivingMailbox.cancel(); + + // After the stream is cancelled, sender will start seeing errors + try { + sendingMailbox.send(getTestTransferableBlock(1, false)); + Assert.fail("Method call above should have failed"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("cancelled InMemory")); + } + + // Cancel is idempotent for both sending and receiving mailbox so safe to call multiple times + receivingMailbox.cancel(); + sendingMailbox.cancel(new RuntimeException("foo")); + } + + @Test + public void testInMemoryStreamCancellationBySender() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 10_000; + InMemoryMailboxService mailboxService = new InMemoryMailboxService("localhost", 0, ignored -> { }); + + SendingMailbox<TransferableBlock> sendingMailbox = mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs); + ReceivingMailbox<TransferableBlock> receivingMailbox = mailboxService.getReceivingMailbox(MAILBOX_ID); + + // Send and receive one data block + sendingMailbox.send(getTestTransferableBlock(0, false)); + TransferableBlock receivedBlock = receivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertEquals(receivedBlock.getNumRows(), 1); + + sendingMailbox.cancel(new RuntimeException("foo")); + + // After the stream is cancelled, receiver will get error-blocks + receivedBlock = receivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertTrue(receivedBlock.isErrorBlock()); + + // Cancel is idempotent for both sending and receiving mailbox so safe to call multiple times + sendingMailbox.cancel(new RuntimeException("foo")); + receivingMailbox.cancel(); + } + + @Test + public void testInMemoryStreamTimeOut() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 1000; + InMemoryMailboxService mailboxService = new InMemoryMailboxService("localhost", 0, ignored -> { }); + + SendingMailbox<TransferableBlock> sendingMailbox = mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs); + ReceivingMailbox<TransferableBlock> receivingMailbox = mailboxService.getReceivingMailbox(MAILBOX_ID); + + // Send and receive one data block + sendingMailbox.send(getTestTransferableBlock(0, false)); + TransferableBlock receivedBlock = receivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertEquals(receivedBlock.getNumRows(), 1); + + CountDownLatch neverEndingLatch = new CountDownLatch(1); + Assert.assertFalse(neverEndingLatch.await(1, TimeUnit.SECONDS)); + + // Sends for the mailbox will throw + try { + sendingMailbox.send(getTestTransferableBlock(0, false)); + Assert.fail("Method call above should have failed"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Deadline")); + } + + // Receiver will receive error-blocks after stream timeout + receivedBlock = receivingMailbox.receive(); + Assert.assertNotNull(receivedBlock); + Assert.assertTrue(receivedBlock.isErrorBlock()); + + // Cancel will be a no-op and will not throw. + sendingMailbox.cancel(new RuntimeException("foo")); + receivingMailbox.cancel(); + } + private TransferableBlock getTestTransferableBlock(int index, boolean isEndOfStream) { if (isEndOfStream) { - return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } List<Object[]> rows = new ArrayList<>(index); rows.add(new Object[]{index, "test_data"}); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java index 01232aa03e..89d5e722b3 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java @@ -45,11 +45,11 @@ public class MultiplexingMailboxServiceTest { Mockito.doReturn(1000).when(grpcMailboxService).getMailboxPort(); Mockito.doReturn(1000).when(inMemoryMailboxService).getMailboxPort(); Mockito.doReturn(Mockito.mock(InMemorySendingMailbox.class)).when(inMemoryMailboxService).getSendingMailbox( - Mockito.any()); + Mockito.any(), Mockito.anyLong()); Mockito.doReturn(Mockito.mock(InMemoryReceivingMailbox.class)).when(inMemoryMailboxService).getReceivingMailbox( Mockito.any()); Mockito.doReturn(Mockito.mock(GrpcSendingMailbox.class)).when(grpcMailboxService).getSendingMailbox( - Mockito.any()); + Mockito.any(), Mockito.anyLong()); Mockito.doReturn(Mockito.mock(GrpcReceivingMailbox.class)).when(grpcMailboxService).getReceivingMailbox( Mockito.any()); @@ -66,8 +66,8 @@ public class MultiplexingMailboxServiceTest { Assert.assertEquals("localhost", multiplexService.getHostname()); Assert.assertEquals(1000, multiplexService.getMailboxPort()); - Assert.assertTrue(multiplexService.getSendingMailbox(LOCAL_MAILBOX_ID) instanceof InMemorySendingMailbox); - Assert.assertTrue(multiplexService.getSendingMailbox(NON_LOCAL_MAILBOX_ID) instanceof GrpcSendingMailbox); + Assert.assertTrue(multiplexService.getSendingMailbox(LOCAL_MAILBOX_ID, -1) instanceof InMemorySendingMailbox); + Assert.assertTrue(multiplexService.getSendingMailbox(NON_LOCAL_MAILBOX_ID, -1) instanceof GrpcSendingMailbox); Assert.assertTrue(multiplexService.getReceivingMailbox(LOCAL_MAILBOX_ID) instanceof InMemoryReceivingMailbox); Assert.assertTrue(multiplexService.getReceivingMailbox(NON_LOCAL_MAILBOX_ID) instanceof GrpcReceivingMailbox); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index aea548607a..4ba8090e09 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -183,7 +183,7 @@ public class OpChainSchedulerServiceTest { } @Test - public void shouldCallCloseOnOperatorsThatReturnErrorBlock() + public void shouldCallCancelOnOperatorsThatReturnErrorBlock() throws InterruptedException { initExecutor(1); OpChain opChain = getChain(_operatorA); @@ -196,7 +196,7 @@ public class OpChainSchedulerServiceTest { Mockito.doAnswer(inv -> { latch.countDown(); return null; - }).when(_operatorA).close(); + }).when(_operatorA).cancel(Mockito.any()); scheduler.startAsync().awaitRunning(); scheduler.register(opChain); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index b916b5cc5e..ea7df49829 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -64,7 +64,8 @@ public class MailboxSendOperatorTest { @BeforeMethod public void setUp() { _mocks = MockitoAnnotations.openMocks(this); - Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())) + Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.anyLong())) .thenReturn(_exchange); Mockito.when(_server.getHostname()).thenReturn("mock"); @@ -79,13 +80,15 @@ public class MailboxSendOperatorTest { } @Test - public void shouldSwallowNoOpBlockFromUpstream() { + public void shouldSwallowNoOpBlockFromUpstream() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 10_000; // Given: MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server)); + new VirtualServerAddress(_server), deadlineMs); Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); // When: @@ -97,13 +100,15 @@ public class MailboxSendOperatorTest { } @Test - public void shouldSendErrorBlock() { + public void shouldSendErrorBlock() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 10_000; // Given: MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server)); + new VirtualServerAddress(_server), deadlineMs); TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")); Mockito.when(_input.nextBlock()).thenReturn(errorBlock); @@ -116,13 +121,15 @@ public class MailboxSendOperatorTest { } @Test - public void shouldSendErrorBlockWhenInputThrows() { + public void shouldSendErrorBlockWhenInputThrows() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 10_000; // Given: MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server)); + new VirtualServerAddress(_server), deadlineMs); Mockito.when(_input.nextBlock()).thenThrow(new RuntimeException("foo!")); ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class); @@ -136,13 +143,15 @@ public class MailboxSendOperatorTest { } @Test - public void shouldSendEosBlock() { + public void shouldSendEosBlock() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 10_000; // Given: MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server)); + new VirtualServerAddress(_server), deadlineMs); TransferableBlock eosBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock(); Mockito.when(_input.nextBlock()).thenReturn(eosBlock); @@ -155,13 +164,15 @@ public class MailboxSendOperatorTest { } @Test - public void shouldSendDataBlock() { + public void shouldSendDataBlock() + throws Exception { + long deadlineMs = System.currentTimeMillis() + 10_000; // Given: MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), RelDistribution.Type.HASH_DISTRIBUTED, _selector, server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server)); + new VirtualServerAddress(_server), deadlineMs); TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{})); Mockito.when(_input.nextBlock()).thenReturn(dataBlock) .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java index 033f010a8c..6afdf89a03 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java @@ -58,7 +58,8 @@ public class BlockExchangeTest { } @Test - public void shouldSendEosBlockToAllDestinations() { + public void shouldSendEosBlockToAllDestinations() + throws Exception { // Given: List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2); BlockExchange exchange = new TestBlockExchange(destinations); @@ -78,7 +79,8 @@ public class BlockExchangeTest { } @Test - public void shouldSendDataBlocksOnlyToTargetDestination() { + public void shouldSendDataBlocksOnlyToTargetDestination() + throws Exception { // Given: List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1); BlockExchange exchange = new TestBlockExchange(destinations); @@ -97,7 +99,8 @@ public class BlockExchangeTest { } @Test - public void shouldSplitBlocks() { + public void shouldSplitBlocks() + throws Exception { // Given: List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1); @@ -138,7 +141,8 @@ public class BlockExchangeTest { } @Override - protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) { + protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) + throws Exception { for (SendingMailbox mailbox : destinations) { sendBlock(mailbox, block); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java index b39d8e43fb..f815e00b78 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java @@ -57,7 +57,8 @@ public class BroadcastExchangeTest { } @Test - public void shouldBroadcast() { + public void shouldBroadcast() + throws Exception { // Given: ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java index 50b8b4bdc9..1e140553ca 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java @@ -63,7 +63,8 @@ public class HashExchangeTest { } @Test - public void shouldSplitAndRouteBlocksBasedOnPartitionKey() { + public void shouldSplitAndRouteBlocksBasedOnPartitionKey() + throws Exception { // Given: TestSelector selector = new TestSelector(Iterators.forArray(2, 0, 1)); Mockito.when(_block.getContainer()).thenReturn(ImmutableList.of(new Object[]{0}, new Object[]{1}, new Object[]{2})); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java index ab97ea6196..aca4180174 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java @@ -56,7 +56,8 @@ public class RandomExchangeTest { } @Test - public void shouldRouteRandomly() { + public void shouldRouteRandomly() + throws Exception { // Given: ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java index 09855be2a4..3a964494ec 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java @@ -54,7 +54,8 @@ public class SingletonExchangeTest { } @Test - public void shouldRouteSingleton() { + public void shouldRouteSingleton() + throws Exception { // Given: ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org