walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014154848


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -149,7 +150,23 @@ public BaseDataBlock.Type getType() {
    * @return whether this block is the end of stream.
    */
   public boolean isEndOfStreamBlock() {
-    return _type == BaseDataBlock.Type.METADATA;
+    if (_isErrorBlock) {
+      return true;
+    } else if (_type != BaseDataBlock.Type.METADATA) {
+      return false;
+    }
+
+    MetadataBlock metadata = (MetadataBlock) _dataBlock;
+    return metadata.getType() == MetadataBlock.MetadataBlockType.EOS;
+  }
+
+  public boolean isNoOpBlock() {
+    if (_type != BaseDataBlock.Type.METADATA) {
+      return false;
+    }
+
+    MetadataBlock metadata = (MetadataBlock) _dataBlock;
+    return metadata.getType() == MetadataBlock.MetadataBlockType.NOOP;
   }
 
   /**

Review Comment:
   let's also change the `isErrorBlock()`, no longer need a special boolean 
flag. since it is saved in the metadata block anyway



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -91,6 +93,7 @@ public AggregateOperator(Operator<TransferableBlock> 
inputOperator, DataSchema d
     }
     _resultSchema = dataSchema;
 
+    _readyToConstruct = false;
     _isCumulativeBlockConstructed = false;

Review Comment:
   we can remove `_isCumulativeBlockConstructed`. this in the current context 
means `!_readyToConstruct`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -137,25 +137,31 @@ public static List<DataTable> 
reduceMailboxReceive(MailboxReceiveOperator mailbo
           throw new RuntimeException("Received error query execution result 
block: "
               + transferableBlock.getDataBlock().getExceptions());
       }
+      if (transferableBlock.isNoOpBlock()) {
+        continue;
+      } else if (transferableBlock.isEndOfStreamBlock()) {
+        return resultDataBlocks;
+      }

Review Comment:
   in `transferableBlock.isNoOpBlock()` it checks `metadataBlock.getType() == 
MetadataBlock.MetadataBlockType.NOOP`. however metadataBlock can be null. which 
will throw NPE, no?
   
   should we have a null checker? transferableBlock.getDataBlock() != null ?
   
   previously the null check is not necessary b/c it only look at the 
`BaseDataBlock.Type _type` member variable which cannot be null in 
TransferableBlock



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java:
##########
@@ -99,9 +98,13 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     // Build JOIN hash table
     buildBroadcastHashTable();
+
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (!_isHashTableBuilt) {
+      return TransferableBlockUtils.getNoOpTransferableBlock();
     }
+

Review Comment:
   nit: can we pull the `_isHashTableBuilt` boolean checker out of the private 
class into here. makes the logic cleaner



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -116,6 +119,11 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       consumeInputBlocks();

Review Comment:
   nit: can we pull all the `_isReadyToConstruct` boolean up from the private 
methods to here. makes the logic cleaner.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -111,37 +114,46 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (System.nanoTime() >= _timeout) {
+      LOGGER.error("Timed out after polling mailboxes: {}", 
_sendingStageInstances);
+      return 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
     }
-    // TODO: do a round robin check against all MailboxContentStreamObservers 
and find which one that has data.
-    boolean hasOpenedMailbox = true;
-    long timeoutWatermark = System.nanoTime() + _timeout;
-    while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
-      hasOpenedMailbox = false;
-      for (ServerInstance sendingInstance : _sendingStageInstances) {
-        try {
-          ReceivingMailbox<TransferableBlock> receivingMailbox =
-              
_mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
-          // TODO this is not threadsafe.
-          // make sure only one thread is checking receiving mailbox and 
calling receive() then close()
-          if (!receivingMailbox.isClosed()) {
-            hasOpenedMailbox = true;
-            TransferableBlock transferableBlock = receivingMailbox.receive();
-            if (transferableBlock != null && 
!transferableBlock.isEndOfStreamBlock()) {
-              // Return the block only if it has some valid data
-              return transferableBlock;
+
+    int startingIdx = _serverIdx;
+    int openMailboxCount = 0;
+    int eosCount = 0;
+
+    for (int i = 0; i < _sendingStageInstances.size(); i++) {
+      // this implements a round-robin mailbox iterator so we don't starve any 
mailboxes
+      _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
+
+      ServerInstance server = _sendingStageInstances.get(_serverIdx);
+      try {
+        ReceivingMailbox<TransferableBlock> mailbox = 
_mailboxService.getReceivingMailbox(toMailboxId(server));
+        if (!mailbox.isClosed()) {
+          openMailboxCount++;
+
+          // this is blocking for 100ms and may return null
+          TransferableBlock block = mailbox.receive();
+          if (block != null) {
+            if (!block.isEndOfStreamBlock()) {
+              return block;
             }
+            eosCount++;
           }
-        } catch (Exception e) {
-          LOGGER.error(String.format("Error receiving data from mailbox %s", 
sendingInstance), e);
         }
+      } catch (Exception e) {
+        LOGGER.error(String.format("Error receiving data from mailbox %s", 
server), e);
       }
     }
-    if (System.nanoTime() >= timeoutWatermark) {
-      LOGGER.error("Timed out after polling mailboxes: {}", 
_sendingStageInstances);
-      return 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-    } else {
-      return 
TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
-    }
+
+    // if we opened at least one mailbox, but still got to this point, then 
that means
+    // all the mailboxes we opened returned null but were not yet closed - 
early terminate
+    // with a noop block. Otherwise, we have exhausted all data from all 
mailboxes and can
+    // return EOS
+    return openMailboxCount > 0 && (openMailboxCount != eosCount)

Review Comment:
   This condition is a bit hard for me to validate. can't we just do 
`openMailboxCount > 0`? 
   IIUC, the last one is only for when you exactly close a mailbox afterwards 
and save another call to the getNextBlock() only to return an EOS, yes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to