siddharthteotia commented on code in PR #9064: URL: https://github.com/apache/pinot/pull/9064#discussion_r927808971
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -101,25 +107,72 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ BaseDataBlock dataBlock; try { DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null); - // this works because default DataTableImplV3 will have a version number at beginning, - // which maps to ROW type of version 3. - dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes())); + if (!dataTable.getExceptions().isEmpty()) { + // if contains exception, directly return a metadata block with the exceptions. + dataBlock = DataBlockUtils.getErrorDataBlock(dataTable.getExceptions()); + } else { + // this works because default DataTableImplV3 will have a version number at beginning: + // the new DataBlock encodes lower 16 bites as version and upper 16 bites as type (ROW, COLUMNAR, METADATA) + dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes())); + } } catch (IOException e) { - throw new RuntimeException("Unable to convert byte buffer", e); + dataBlock = DataBlockUtils.getErrorDataBlock(e); } MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId()); MailboxSendOperator mailboxSendOperator = - new MailboxSendOperator(_mailboxService, dataBlock, receivingStageMetadata.getServerInstances(), - sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostname, _port, - serverQueryRequest.getRequestId(), sendNode.getStageId()); + new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(), + new LeafStageTransferableBlockOperator(dataBlock, sendNode.getDataSchema()), + receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(), + sendNode.getPartitionKeySelector(), _hostname, _port, serverQueryRequest.getRequestId(), + sendNode.getStageId()); mailboxSendOperator.nextBlock(); + if (dataBlock.getExceptions().isEmpty()) { + mailboxSendOperator.nextBlock(); + } } else { _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, executorService); } } + private static class LeafStageTransferableBlockOperator extends BaseOperator<TransferableBlock> { + private static final String EXPLAIN_NAME = "FAKE_TRANSFER_OPERATOR"; Review Comment: Why do we need this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org