agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014386219


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -111,37 +114,46 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (System.nanoTime() >= _timeout) {
+      LOGGER.error("Timed out after polling mailboxes: {}", 
_sendingStageInstances);
+      return 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
     }
-    // TODO: do a round robin check against all MailboxContentStreamObservers 
and find which one that has data.
-    boolean hasOpenedMailbox = true;
-    long timeoutWatermark = System.nanoTime() + _timeout;
-    while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
-      hasOpenedMailbox = false;
-      for (ServerInstance sendingInstance : _sendingStageInstances) {
-        try {
-          ReceivingMailbox<TransferableBlock> receivingMailbox =
-              
_mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
-          // TODO this is not threadsafe.
-          // make sure only one thread is checking receiving mailbox and 
calling receive() then close()
-          if (!receivingMailbox.isClosed()) {
-            hasOpenedMailbox = true;
-            TransferableBlock transferableBlock = receivingMailbox.receive();
-            if (transferableBlock != null && 
!transferableBlock.isEndOfStreamBlock()) {
-              // Return the block only if it has some valid data
-              return transferableBlock;
+
+    int startingIdx = _serverIdx;
+    int openMailboxCount = 0;
+    int eosCount = 0;
+
+    for (int i = 0; i < _sendingStageInstances.size(); i++) {
+      // this implements a round-robin mailbox iterator so we don't starve any 
mailboxes
+      _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
+
+      ServerInstance server = _sendingStageInstances.get(_serverIdx);
+      try {
+        ReceivingMailbox<TransferableBlock> mailbox = 
_mailboxService.getReceivingMailbox(toMailboxId(server));
+        if (!mailbox.isClosed()) {
+          openMailboxCount++;
+
+          // this is blocking for 100ms and may return null
+          TransferableBlock block = mailbox.receive();
+          if (block != null) {
+            if (!block.isEndOfStreamBlock()) {
+              return block;
             }
+            eosCount++;
           }
-        } catch (Exception e) {
-          LOGGER.error(String.format("Error receiving data from mailbox %s", 
sendingInstance), e);
         }
+      } catch (Exception e) {
+        LOGGER.error(String.format("Error receiving data from mailbox %s", 
server), e);
       }
     }
-    if (System.nanoTime() >= timeoutWatermark) {
-      LOGGER.error("Timed out after polling mailboxes: {}", 
_sendingStageInstances);
-      return 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-    } else {
-      return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
-    }
+
+    // if we opened at least one mailbox, but still got to this point, then 
that means
+    // all the mailboxes we opened returned null but were not yet closed - 
early terminate
+    // with a noop block. Otherwise, we have exhausted all data from all 
mailboxes and can
+    // return EOS
+    return openMailboxCount > 0 && (openMailboxCount != eosCount)

Review Comment:
   yeah this condition isn't necessary, it's technically an optimization to 
avoid needing another call. I'll remove it (at first I thought it was 
necessary, but it was actually a different bug that I was figuring out)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to