walterddr commented on code in PR #10971: URL: https://github.com/apache/pinot/pull/10971#discussion_r1241284837
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java: ########## @@ -189,73 +191,60 @@ int submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeout @VisibleForTesting public static ResultTable runReducer(long requestId, DispatchableSubPlan dispatchableSubPlan, int reduceStageId, - long timeoutMs, - MailboxService mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean traceEnabled) { + long timeoutMs, MailboxService mailboxService, OpChainSchedulerService scheduler, + Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean traceEnabled) { DispatchablePlanFragment reduceStagePlanFragment = dispatchableSubPlan.getQueryStageList().get(reduceStageId); MailboxReceiveNode reduceNode = (MailboxReceiveNode) reduceStagePlanFragment.getPlanFragment().getFragmentRoot(); + reduceNode.setExchangeType(PinotRelExchangeType.PIPELINE_BREAKER); VirtualServerAddress server = new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), 0); StageMetadata brokerStageMetadata = new StageMetadata.Builder() .setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList()) .addCustomProperties(reduceStagePlanFragment.getCustomProperties()) .build(); - PhysicalPlanContext planContext = new PhysicalPlanContext(mailboxService, requestId, reduceStageId, - System.currentTimeMillis() + timeoutMs, server, brokerStageMetadata, null, traceEnabled); - OpChainExecutionContext context = new OpChainExecutionContext(planContext); - MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(context, reduceNode.getSenderStageId()); - List<DataBlock> resultDataBlocks = - reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, dispatchableSubPlan, - context.getStats()); + DistributedStagePlan reducerStagePlan = new DistributedStagePlan(0, server, reduceNode, brokerStageMetadata); + PipelineBreakerResult pipelineBreakerResult = + PipelineBreakerExecutor.executePipelineBreakers(scheduler, mailboxService, reducerStagePlan, + System.currentTimeMillis() + timeoutMs, requestId, traceEnabled); + if (pipelineBreakerResult == null) { + throw new RuntimeException("Broker reducer error during query execution!"); + } + collectStats(dispatchableSubPlan, pipelineBreakerResult.getOpChainStats(), statsAggregatorMap); + List<TransferableBlock> resultDataBlocks = pipelineBreakerResult.getResultMap().get(0); return toResultTable(resultDataBlocks, dispatchableSubPlan.getQueryResultFields(), dispatchableSubPlan.getQueryStageList().get(0).getPlanFragment().getFragmentRoot().getDataSchema()); } - private static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs, - @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, - DispatchableSubPlan dispatchableSubPlan, - OpChainStats stats) { - List<DataBlock> resultDataBlocks = new ArrayList<>(); - TransferableBlock transferableBlock; - long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L; - while (System.nanoTime() < timeoutWatermark) { - transferableBlock = mailboxReceiveOperator.nextBlock(); - if (TransferableBlockUtils.isEndOfStream(transferableBlock) && transferableBlock.isErrorBlock()) { - // TODO: we only received bubble up error from the execution stage tree. - // TODO: query dispatch should also send cancel signal to the rest of the execution stage tree. - throw new RuntimeException( - "Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions()); - } - if (transferableBlock.isNoOpBlock()) { - continue; - } else if (transferableBlock.isEndOfStreamBlock()) { - if (executionStatsAggregatorMap != null) { - for (Map.Entry<String, OperatorStats> entry : stats.getOperatorStatsMap().entrySet()) { - LOGGER.info("Broker Query Execution Stats - OperatorId: {}, OperatorStats: {}", entry.getKey(), - OperatorUtils.operatorStatsToJson(entry.getValue())); - OperatorStats operatorStats = entry.getValue(); - ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0); - ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId()); - rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); - if (stageStatsAggregator != null) { - if (dispatchableSubPlan != null) { - OperatorUtils.recordTableName(operatorStats, - dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId())); - } - stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); - } + private static void collectStats(DispatchableSubPlan dispatchableSubPlan, @Nullable OpChainStats opChainStats, + @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) { + if (executionStatsAggregatorMap != null && opChainStats != null) { + for (Map.Entry<String, OperatorStats> entry : opChainStats.getOperatorStatsMap().entrySet()) { + LOGGER.info("Broker Query Execution Stats - OperatorId: {}, OperatorStats: {}", entry.getKey(), + OperatorUtils.operatorStatsToJson(entry.getValue())); + OperatorStats operatorStats = entry.getValue(); + ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0); + ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId()); + rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); + if (stageStatsAggregator != null) { + if (dispatchableSubPlan != null) { + OperatorUtils.recordTableName(operatorStats, + dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId())); } + stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>()); } - return resultDataBlocks; } - resultDataBlocks.add(transferableBlock.getDataBlock()); } - throw new RuntimeException("Timed out while receiving from mailbox: " + QueryException.EXECUTION_TIMEOUT_ERROR); } - private static ResultTable toResultTable(List<DataBlock> queryResult, List<Pair<Integer, String>> fields, + private static ResultTable toResultTable(List<TransferableBlock> queryResult, List<Pair<Integer, String>> fields, DataSchema sourceSchema) { List<Object[]> resultRows = new ArrayList<>(); DataSchema resultSchema = toResultSchema(sourceSchema, fields); - for (DataBlock dataBlock : queryResult) { + for (TransferableBlock transferableBlock : queryResult) { + if (transferableBlock.isErrorBlock()) { Review Comment: it is impossible for pipeline breaker to create multiple success then error, when error is produced, the result array will be cleared and thus can only have 1 error block or >=0 non-error data block -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org