walterddr commented on code in PR #10971:
URL: https://github.com/apache/pinot/pull/10971#discussion_r1241284837


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -189,73 +191,60 @@ int submit(long requestId, DispatchableSubPlan 
dispatchableSubPlan, long timeout
 
   @VisibleForTesting
   public static ResultTable runReducer(long requestId, DispatchableSubPlan 
dispatchableSubPlan, int reduceStageId,
-      long timeoutMs,
-      MailboxService mailboxService, Map<Integer, ExecutionStatsAggregator> 
statsAggregatorMap, boolean traceEnabled) {
+      long timeoutMs, MailboxService mailboxService, OpChainSchedulerService 
scheduler,
+      Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean 
traceEnabled) {
     DispatchablePlanFragment reduceStagePlanFragment = 
dispatchableSubPlan.getQueryStageList().get(reduceStageId);
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
reduceStagePlanFragment.getPlanFragment().getFragmentRoot();
+    reduceNode.setExchangeType(PinotRelExchangeType.PIPELINE_BREAKER);
     VirtualServerAddress server = new 
VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), 0);
     StageMetadata brokerStageMetadata = new StageMetadata.Builder()
         .setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList())
         .addCustomProperties(reduceStagePlanFragment.getCustomProperties())
         .build();
-    PhysicalPlanContext planContext = new PhysicalPlanContext(mailboxService, 
requestId, reduceStageId,
-        System.currentTimeMillis() + timeoutMs, server, brokerStageMetadata, 
null, traceEnabled);
-    OpChainExecutionContext context = new OpChainExecutionContext(planContext);
-    MailboxReceiveOperator mailboxReceiveOperator = 
createReduceStageOperator(context, reduceNode.getSenderStageId());
-    List<DataBlock> resultDataBlocks =
-        reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, 
statsAggregatorMap, dispatchableSubPlan,
-            context.getStats());
+    DistributedStagePlan reducerStagePlan = new DistributedStagePlan(0, 
server, reduceNode, brokerStageMetadata);
+    PipelineBreakerResult pipelineBreakerResult =
+        PipelineBreakerExecutor.executePipelineBreakers(scheduler, 
mailboxService, reducerStagePlan,
+            System.currentTimeMillis() + timeoutMs, requestId, traceEnabled);
+    if (pipelineBreakerResult == null) {
+      throw new RuntimeException("Broker reducer error during query 
execution!");
+    }
+    collectStats(dispatchableSubPlan, pipelineBreakerResult.getOpChainStats(), 
statsAggregatorMap);
+    List<TransferableBlock> resultDataBlocks = 
pipelineBreakerResult.getResultMap().get(0);
     return toResultTable(resultDataBlocks, 
dispatchableSubPlan.getQueryResultFields(),
         
dispatchableSubPlan.getQueryStageList().get(0).getPlanFragment().getFragmentRoot().getDataSchema());
   }
 
-  private static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator 
mailboxReceiveOperator, long timeoutMs,
-      @Nullable Map<Integer, ExecutionStatsAggregator> 
executionStatsAggregatorMap,
-      DispatchableSubPlan dispatchableSubPlan,
-      OpChainStats stats) {
-    List<DataBlock> resultDataBlocks = new ArrayList<>();
-    TransferableBlock transferableBlock;
-    long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
-    while (System.nanoTime() < timeoutWatermark) {
-      transferableBlock = mailboxReceiveOperator.nextBlock();
-      if (TransferableBlockUtils.isEndOfStream(transferableBlock) && 
transferableBlock.isErrorBlock()) {
-        // TODO: we only received bubble up error from the execution stage 
tree.
-        // TODO: query dispatch should also send cancel signal to the rest of 
the execution stage tree.
-        throw new RuntimeException(
-            "Received error query execution result block: " + 
transferableBlock.getDataBlock().getExceptions());
-      }
-      if (transferableBlock.isNoOpBlock()) {
-        continue;
-      } else if (transferableBlock.isEndOfStreamBlock()) {
-        if (executionStatsAggregatorMap != null) {
-          for (Map.Entry<String, OperatorStats> entry : 
stats.getOperatorStatsMap().entrySet()) {
-            LOGGER.info("Broker Query Execution Stats - OperatorId: {}, 
OperatorStats: {}", entry.getKey(),
-                OperatorUtils.operatorStatsToJson(entry.getValue()));
-            OperatorStats operatorStats = entry.getValue();
-            ExecutionStatsAggregator rootStatsAggregator = 
executionStatsAggregatorMap.get(0);
-            ExecutionStatsAggregator stageStatsAggregator = 
executionStatsAggregatorMap.get(operatorStats.getStageId());
-            rootStatsAggregator.aggregate(null, 
entry.getValue().getExecutionStats(), new HashMap<>());
-            if (stageStatsAggregator != null) {
-              if (dispatchableSubPlan != null) {
-                OperatorUtils.recordTableName(operatorStats,
-                    
dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
-              }
-              stageStatsAggregator.aggregate(null, 
entry.getValue().getExecutionStats(), new HashMap<>());
-            }
+  private static void collectStats(DispatchableSubPlan dispatchableSubPlan, 
@Nullable OpChainStats opChainStats,
+      @Nullable Map<Integer, ExecutionStatsAggregator> 
executionStatsAggregatorMap) {
+    if (executionStatsAggregatorMap != null && opChainStats != null) {
+      for (Map.Entry<String, OperatorStats> entry : 
opChainStats.getOperatorStatsMap().entrySet()) {
+        LOGGER.info("Broker Query Execution Stats - OperatorId: {}, 
OperatorStats: {}", entry.getKey(),
+            OperatorUtils.operatorStatsToJson(entry.getValue()));
+        OperatorStats operatorStats = entry.getValue();
+        ExecutionStatsAggregator rootStatsAggregator = 
executionStatsAggregatorMap.get(0);
+        ExecutionStatsAggregator stageStatsAggregator = 
executionStatsAggregatorMap.get(operatorStats.getStageId());
+        rootStatsAggregator.aggregate(null, 
entry.getValue().getExecutionStats(), new HashMap<>());
+        if (stageStatsAggregator != null) {
+          if (dispatchableSubPlan != null) {
+            OperatorUtils.recordTableName(operatorStats,
+                
dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
           }
+          stageStatsAggregator.aggregate(null, 
entry.getValue().getExecutionStats(), new HashMap<>());
         }
-        return resultDataBlocks;
       }
-      resultDataBlocks.add(transferableBlock.getDataBlock());
     }
-    throw new RuntimeException("Timed out while receiving from mailbox: " + 
QueryException.EXECUTION_TIMEOUT_ERROR);
   }
 
-  private static ResultTable toResultTable(List<DataBlock> queryResult, 
List<Pair<Integer, String>> fields,
+  private static ResultTable toResultTable(List<TransferableBlock> 
queryResult, List<Pair<Integer, String>> fields,
       DataSchema sourceSchema) {
     List<Object[]> resultRows = new ArrayList<>();
     DataSchema resultSchema = toResultSchema(sourceSchema, fields);
-    for (DataBlock dataBlock : queryResult) {
+    for (TransferableBlock transferableBlock : queryResult) {
+      if (transferableBlock.isErrorBlock()) {

Review Comment:
   it is impossible for pipeline breaker to create multiple success then error, 
when error is produced, the result array will be cleared and thus can only have 
1 error block or >=0 non-error data block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to