ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1128186807


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -70,29 +79,37 @@ public Consumer<MailboxIdentifier> 
init(MailboxContentStreamObserver streamObser
    *  2. If the received block from the sender is a data-block with 0 rows.
    * </p>
    */
+  @Nullable
   @Override
-  public TransferableBlock receive()
-      throws Exception {
+  public TransferableBlock receive() throws Exception {
     if (!waitForInitialize()) {
       return null;
     }
     MailboxContent mailboxContent = _contentStreamObserver.poll();
-    _totalMsgReceived.incrementAndGet();
     return mailboxContent == null ? null : fromMailboxContent(mailboxContent);
   }
 
   @Override
   public boolean isInitialized() {
-    return _initializationLatch.getCount() <= 0;
+    return _initializationLatch.getCount() == 0;
   }
 
   @Override
   public boolean isClosed() {
-    return isInitialized() && _contentStreamObserver.isCompleted();
+    return isInitialized() && _contentStreamObserver.hasConsumedAllData();
   }
 
   @Override
-  public void cancel(Throwable e) {
+  public void cancel() {
+    if (isInitialized()) {
+      try {
+        _statusStreamObserver.onError(Status.CANCELLED.asRuntimeException());
+      } catch (Exception e) {
+        // TODO: This can happen if the call is already closed. Consider 
removing this log altogether or find a way
+        //  to check if the stream is already closed.
+        LOGGER.info("Tried to cancel receiving mailbox", e);

Review Comment:
   This is not related to gRPC but our business logic. There can be concurrent 
cancellations/terminations of the stream so this can always happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to