walterddr commented on code in PR #9064: URL: https://github.com/apache/pinot/pull/9064#discussion_r927823286
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -101,25 +107,72 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ BaseDataBlock dataBlock; try { DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null); - // this works because default DataTableImplV3 will have a version number at beginning, - // which maps to ROW type of version 3. - dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes())); + if (!dataTable.getExceptions().isEmpty()) { + // if contains exception, directly return a metadata block with the exceptions. + dataBlock = DataBlockUtils.getErrorDataBlock(dataTable.getExceptions()); + } else { + // this works because default DataTableImplV3 will have a version number at beginning: + // the new DataBlock encodes lower 16 bites as version and upper 16 bites as type (ROW, COLUMNAR, METADATA) + dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes())); + } } catch (IOException e) { - throw new RuntimeException("Unable to convert byte buffer", e); + dataBlock = DataBlockUtils.getErrorDataBlock(e); } MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId()); MailboxSendOperator mailboxSendOperator = - new MailboxSendOperator(_mailboxService, dataBlock, receivingStageMetadata.getServerInstances(), - sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostname, _port, - serverQueryRequest.getRequestId(), sendNode.getStageId()); + new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(), + new LeafStageTransferableBlockOperator(dataBlock, sendNode.getDataSchema()), + receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(), + sendNode.getPartitionKeySelector(), _hostname, _port, serverQueryRequest.getRequestId(), + sendNode.getStageId()); mailboxSendOperator.nextBlock(); + if (dataBlock.getExceptions().isEmpty()) { + mailboxSendOperator.nextBlock(); + } } else { _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, executorService); } } + private static class LeafStageTransferableBlockOperator extends BaseOperator<TransferableBlock> { + private static final String EXPLAIN_NAME = "FAKE_TRANSFER_OPERATOR"; Review Comment: because leaf stage only returns one DataTable. we need to create the logic for `nextBlock()` - if the datatable is not empty and contains no error, then send in a datablock, the next time `getBlock()` is called. it should send the metadata of the datablock - if the datatable contains error, the send the error as metadata block and nothing should be return if `getBlock()` is called again previously this was implemented in mailbox and it was a bit tedious to maintain both side so I factor it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org