This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f7c281f2dc [multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full (#9969) f7c281f2dc is described below commit f7c281f2dc5d76ed787e0aeebbc6c629c8cec50e Author: Yao Liu <y...@startree.ai> AuthorDate: Wed Dec 14 11:15:36 2022 -0800 [multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full (#9969) * error out when receiving buffer full * fix race condition --- .../pinot/query/mailbox/GrpcReceivingMailbox.java | 1 - .../channel/MailboxContentStreamObserver.java | 47 +++++++++++++++++++--- .../channel/MailboxStatusStreamObserver.java | 1 - 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java index bb6dce4b76..ce0152f1e6 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java @@ -94,7 +94,6 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> return isInitialized() && _contentStreamObserver.isCompleted(); } - // TODO: fix busy wait. This should be guarded by timeout. private boolean waitForInitialize() throws Exception { if (_initializationLatch.getCount() > 0) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java index 1b6c8a8130..238c3cf60c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java @@ -23,6 +23,8 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.query.mailbox.GrpcMailboxService; @@ -43,6 +45,14 @@ import org.slf4j.LoggerFactory; */ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class); + + private static Mailbox.MailboxContent createErrorContent(Throwable e) + throws IOException { + return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom( + TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(e)).getDataBlock().toBytes())) + .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build(); + } + private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5; private final GrpcMailboxService _mailboxService; private final StreamObserver<Mailbox.MailboxStatus> _responseObserver; @@ -50,6 +60,9 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail private final AtomicBoolean _isCompleted = new AtomicBoolean(false); private final ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer; + + ReadWriteLock _errorLock = new ReentrantReadWriteLock(); + private Mailbox.MailboxContent _errorContent = null; // Guarded by _errorLock. private StringMailboxIdentifier _mailboxId; private Consumer<MailboxIdentifier> _gotMailCallback; @@ -73,6 +86,14 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail * to indicate when to call this method. */ public Mailbox.MailboxContent poll() { + try { + _errorLock.readLock().lock(); + if (_errorContent != null) { + return _errorContent; + } + } finally { + _errorLock.readLock().unlock(); + } if (isCompleted()) { return null; } @@ -93,7 +114,22 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail if (!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY)) { // when the receiving end receives a message put it in the mailbox queue. - _receivingBuffer.offer(mailboxContent); + // TODO: pass a timeout to _receivingBuffer. + if (!_receivingBuffer.offer(mailboxContent)) { + // TODO: close the stream. + RuntimeException e = new RuntimeException("Mailbox receivingBuffer is full:" + _mailboxId); + LOGGER.error(e.getMessage()); + try { + _errorLock.writeLock().lock(); + _errorContent = createErrorContent(e); + } catch (IOException ioe) { + e = new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe); + LOGGER.error(e.getMessage()); + throw e; + } finally { + _errorLock.writeLock().unlock(); + } + } _gotMailCallback.accept(_mailboxId); if (_isEnabledFeedback) { @@ -116,14 +152,15 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail @Override public void onError(Throwable e) { try { - _receivingBuffer.offer(Mailbox.MailboxContent.newBuilder() - .setPayload(ByteString.copyFrom( - TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(e)).getDataBlock().toBytes())) - .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build()); + _errorLock.writeLock().lock(); + _errorContent = createErrorContent(e); _gotMailCallback.accept(_mailboxId); + // TODO: close the stream. throw new RuntimeException(e); } catch (IOException ioe) { throw new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe); + } finally { + _errorLock.writeLock().unlock(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java index e482583bd3..b758f0793d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.MailboxStatus> { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxStatusStreamObserver.class); private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5; - private static final long DEFAULT_MAILBOX_POLL_TIMEOUT_MS = 1000L; private final AtomicInteger _bufferSize = new AtomicInteger(5); private final AtomicBoolean _isCompleted = new AtomicBoolean(false); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org