walterddr commented on code in PR #9064:
URL: https://github.com/apache/pinot/pull/9064#discussion_r927823286


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -101,25 +107,72 @@ public void processQuery(DistributedStagePlan 
distributedStagePlan, ExecutorServ
       BaseDataBlock dataBlock;
       try {
         DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, 
executorService, null);
-        // this works because default DataTableImplV3 will have a version 
number at beginning,
-        // which maps to ROW type of version 3.
-        dataBlock = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+        if (!dataTable.getExceptions().isEmpty()) {
+          // if contains exception, directly return a metadata block with the 
exceptions.
+          dataBlock = 
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+        } else {
+          // this works because default DataTableImplV3 will have a version 
number at beginning:
+          // the new DataBlock encodes lower 16 bites as version and upper 16 
bites as type (ROW, COLUMNAR, METADATA)
+          dataBlock = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+        }
       } catch (IOException e) {
-        throw new RuntimeException("Unable to convert byte buffer", e);
+        dataBlock = DataBlockUtils.getErrorDataBlock(e);
       }
 
       MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
       StageMetadata receivingStageMetadata = 
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
       MailboxSendOperator mailboxSendOperator =
-          new MailboxSendOperator(_mailboxService, dataBlock, 
receivingStageMetadata.getServerInstances(),
-              sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), 
_hostname, _port,
-              serverQueryRequest.getRequestId(), sendNode.getStageId());
+          new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(),
+              new LeafStageTransferableBlockOperator(dataBlock, 
sendNode.getDataSchema()),
+              receivingStageMetadata.getServerInstances(), 
sendNode.getExchangeType(),
+              sendNode.getPartitionKeySelector(), _hostname, _port, 
serverQueryRequest.getRequestId(),
+              sendNode.getStageId());
       mailboxSendOperator.nextBlock();
+      if (dataBlock.getExceptions().isEmpty()) {
+        mailboxSendOperator.nextBlock();
+      }
     } else {
       _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, 
executorService);
     }
   }
 
+  private static class LeafStageTransferableBlockOperator extends 
BaseOperator<TransferableBlock> {
+    private static final String EXPLAIN_NAME = "FAKE_TRANSFER_OPERATOR";

Review Comment:
   because leaf stage only returns one DataTable. we need to create the logic 
for `nextBlock()`
   - if the datatable is not empty and contains no error, then send in a 
datablock, the next time `getBlock()` is called. it should send the metadata of 
the datablock
   - if the datatable contains error, the send the error as metadata block and 
nothing should be return if `getBlock()` is called again
   
   previously this was implemented in mailbox and it was a bit tedious to 
maintain both side so I factor it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to