agavra commented on code in PR #9711: URL: https://github.com/apache/pinot/pull/9711#discussion_r1014386219
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java: ########## @@ -111,37 +114,46 @@ public String toExplainString() { protected TransferableBlock getNextBlock() { if (_upstreamErrorBlock != null) { return _upstreamErrorBlock; + } else if (System.nanoTime() >= _timeout) { + LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances); + return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); } - // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data. - boolean hasOpenedMailbox = true; - long timeoutWatermark = System.nanoTime() + _timeout; - while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) { - hasOpenedMailbox = false; - for (ServerInstance sendingInstance : _sendingStageInstances) { - try { - ReceivingMailbox<TransferableBlock> receivingMailbox = - _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance)); - // TODO this is not threadsafe. - // make sure only one thread is checking receiving mailbox and calling receive() then close() - if (!receivingMailbox.isClosed()) { - hasOpenedMailbox = true; - TransferableBlock transferableBlock = receivingMailbox.receive(); - if (transferableBlock != null && !transferableBlock.isEndOfStreamBlock()) { - // Return the block only if it has some valid data - return transferableBlock; + + int startingIdx = _serverIdx; + int openMailboxCount = 0; + int eosCount = 0; + + for (int i = 0; i < _sendingStageInstances.size(); i++) { + // this implements a round-robin mailbox iterator so we don't starve any mailboxes + _serverIdx = (startingIdx + i) % _sendingStageInstances.size(); + + ServerInstance server = _sendingStageInstances.get(_serverIdx); + try { + ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server)); + if (!mailbox.isClosed()) { + openMailboxCount++; + + // this is blocking for 100ms and may return null + TransferableBlock block = mailbox.receive(); + if (block != null) { + if (!block.isEndOfStreamBlock()) { + return block; } + eosCount++; } - } catch (Exception e) { - LOGGER.error(String.format("Error receiving data from mailbox %s", sendingInstance), e); } + } catch (Exception e) { + LOGGER.error(String.format("Error receiving data from mailbox %s", server), e); } } - if (System.nanoTime() >= timeoutWatermark) { - LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances); - return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); - } else { - return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema); - } + + // if we opened at least one mailbox, but still got to this point, then that means + // all the mailboxes we opened returned null but were not yet closed - early terminate + // with a noop block. Otherwise, we have exhausted all data from all mailboxes and can + // return EOS + return openMailboxCount > 0 && (openMailboxCount != eosCount) Review Comment: yeah this condition isn't necessary, it's technically an optimization to avoid needing another call. I'll remove it (at first I thought it was necessary, but it was actually a different bug that I was figuring out) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org