walterddr commented on code in PR #10170: URL: https://github.com/apache/pinot/pull/10170#discussion_r1088441421
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java: ########## @@ -94,6 +91,10 @@ public boolean isClosed() { return isInitialized() && _contentStreamObserver.isCompleted(); } + @Override + public void cancel(Throwable e) { Review Comment: shouldn't we clean up the `_contentStreamObserver` in this method? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java: ########## @@ -66,4 +66,8 @@ public String toString() { public void close() { _root.close(); } + + public void cancel(Throwable e) { Review Comment: please add javadoc explaining the difference between cancel and close API ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java: ########## @@ -30,31 +30,18 @@ * {@code MailboxStatusStreamObserver} is used by the SendingMailbox to send data over the wire. * * <p>Once {@link org.apache.pinot.query.mailbox.GrpcSendingMailbox#init()} is called, one instances of this class is - * created based on the opened GRPC connection returned {@link StreamObserver}. From this point, the sending mailbox - * can use the {@link MailboxStatusStreamObserver#send(Mailbox.MailboxContent)} API to send data packet to the receiving + * created based on the opened GRPC connection returned {@link StreamObserver}. * end. */ public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.MailboxStatus> { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxStatusStreamObserver.class); private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5; private final AtomicInteger _bufferSize = new AtomicInteger(5); - private final AtomicBoolean _isCompleted = new AtomicBoolean(false); - private StreamObserver<Mailbox.MailboxContent> _mailboxContentStreamObserver; + private CountDownLatch _finishLatch; - public MailboxStatusStreamObserver() { - } - - public void init(StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver) { - _mailboxContentStreamObserver = mailboxContentStreamObserver; - } - - public void send(Mailbox.MailboxContent mailboxContent) { - _mailboxContentStreamObserver.onNext(mailboxContent); - } - - public void complete() { - _mailboxContentStreamObserver.onCompleted(); Review Comment: nice refactoring. i dont think we need these wrapper anytime soon. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java: ########## @@ -46,4 +49,8 @@ void send(T data) * Complete delivery of the current mailbox. */ void complete(); + + void waitForFinish(long timeout, TimeUnit unit) throws InterruptedException; Review Comment: i dont see `waitForFinish` being called in any place. is it correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org