walterddr commented on code in PR #9711: URL: https://github.com/apache/pinot/pull/9711#discussion_r1014154848
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java: ########## @@ -149,7 +150,23 @@ public BaseDataBlock.Type getType() { * @return whether this block is the end of stream. */ public boolean isEndOfStreamBlock() { - return _type == BaseDataBlock.Type.METADATA; + if (_isErrorBlock) { + return true; + } else if (_type != BaseDataBlock.Type.METADATA) { + return false; + } + + MetadataBlock metadata = (MetadataBlock) _dataBlock; + return metadata.getType() == MetadataBlock.MetadataBlockType.EOS; + } + + public boolean isNoOpBlock() { + if (_type != BaseDataBlock.Type.METADATA) { + return false; + } + + MetadataBlock metadata = (MetadataBlock) _dataBlock; + return metadata.getType() == MetadataBlock.MetadataBlockType.NOOP; } /** Review Comment: let's also change the `isErrorBlock()`, no longer need a special boolean flag. since it is saved in the metadata block anyway ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -91,6 +93,7 @@ public AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema d } _resultSchema = dataSchema; + _readyToConstruct = false; _isCumulativeBlockConstructed = false; Review Comment: we can remove `_isCumulativeBlockConstructed`. this in the current context means `!_readyToConstruct` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java: ########## @@ -137,25 +137,31 @@ public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailbo throw new RuntimeException("Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions()); } + if (transferableBlock.isNoOpBlock()) { + continue; + } else if (transferableBlock.isEndOfStreamBlock()) { + return resultDataBlocks; + } Review Comment: in `transferableBlock.isNoOpBlock()` it checks `metadataBlock.getType() == MetadataBlock.MetadataBlockType.NOOP`. however metadataBlock can be null. which will throw NPE, no? should we have a null checker? transferableBlock.getDataBlock() != null ? previously the null check is not necessary b/c it only look at the `BaseDataBlock.Type _type` member variable which cannot be null in TransferableBlock ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java: ########## @@ -99,9 +98,13 @@ public String toExplainString() { protected TransferableBlock getNextBlock() { // Build JOIN hash table buildBroadcastHashTable(); + if (_upstreamErrorBlock != null) { return _upstreamErrorBlock; + } else if (!_isHashTableBuilt) { + return TransferableBlockUtils.getNoOpTransferableBlock(); } + Review Comment: nit: can we pull the `_isHashTableBuilt` boolean checker out of the private class into here. makes the logic cleaner ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -116,6 +119,11 @@ public String toExplainString() { protected TransferableBlock getNextBlock() { try { consumeInputBlocks(); Review Comment: nit: can we pull all the `_isReadyToConstruct` boolean up from the private methods to here. makes the logic cleaner. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java: ########## @@ -111,37 +114,46 @@ public String toExplainString() { protected TransferableBlock getNextBlock() { if (_upstreamErrorBlock != null) { return _upstreamErrorBlock; + } else if (System.nanoTime() >= _timeout) { + LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances); + return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); } - // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data. - boolean hasOpenedMailbox = true; - long timeoutWatermark = System.nanoTime() + _timeout; - while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) { - hasOpenedMailbox = false; - for (ServerInstance sendingInstance : _sendingStageInstances) { - try { - ReceivingMailbox<TransferableBlock> receivingMailbox = - _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance)); - // TODO this is not threadsafe. - // make sure only one thread is checking receiving mailbox and calling receive() then close() - if (!receivingMailbox.isClosed()) { - hasOpenedMailbox = true; - TransferableBlock transferableBlock = receivingMailbox.receive(); - if (transferableBlock != null && !transferableBlock.isEndOfStreamBlock()) { - // Return the block only if it has some valid data - return transferableBlock; + + int startingIdx = _serverIdx; + int openMailboxCount = 0; + int eosCount = 0; + + for (int i = 0; i < _sendingStageInstances.size(); i++) { + // this implements a round-robin mailbox iterator so we don't starve any mailboxes + _serverIdx = (startingIdx + i) % _sendingStageInstances.size(); + + ServerInstance server = _sendingStageInstances.get(_serverIdx); + try { + ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server)); + if (!mailbox.isClosed()) { + openMailboxCount++; + + // this is blocking for 100ms and may return null + TransferableBlock block = mailbox.receive(); + if (block != null) { + if (!block.isEndOfStreamBlock()) { + return block; } + eosCount++; } - } catch (Exception e) { - LOGGER.error(String.format("Error receiving data from mailbox %s", sendingInstance), e); } + } catch (Exception e) { + LOGGER.error(String.format("Error receiving data from mailbox %s", server), e); } } - if (System.nanoTime() >= timeoutWatermark) { - LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances); - return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); - } else { - return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema); - } + + // if we opened at least one mailbox, but still got to this point, then that means + // all the mailboxes we opened returned null but were not yet closed - early terminate + // with a noop block. Otherwise, we have exhausted all data from all mailboxes and can + // return EOS + return openMailboxCount > 0 && (openMailboxCount != eosCount) Review Comment: This condition is a bit hard for me to validate. can't we just do `openMailboxCount > 0`? IIUC, the last one is only for when you exactly close a mailbox afterwards and save another call to the getNextBlock() only to return an EOS, yes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org