This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b670c552f5 Fix pipeline breaker error handling (#11411)
b670c552f5 is described below

commit b670c552f5b8ab8cbf6d1eaf79c79a0d926c7ebe
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Aug 23 11:43:45 2023 -0700

    Fix pipeline breaker error handling (#11411)
---
 .../apache/pinot/query/mailbox/MailboxService.java |   2 -
 .../apache/pinot/query/runtime/QueryRunner.java    |  56 ++++++----
 .../plan/pipeline/PipelineBreakerExecutor.java     |  56 +++++-----
 .../plan/pipeline/PipelineBreakerOperator.java     | 116 +++++++++------------
 .../plan/pipeline/PipelineBreakerResult.java       |   9 +-
 .../query/service/dispatch/QueryDispatcher.java    |  12 +--
 .../pinot/query/runtime/QueryRunnerTest.java       |  46 ++++----
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  10 +-
 .../plan/pipeline/PipelineBreakerExecutorTest.java |  28 ++---
 9 files changed, 166 insertions(+), 169 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index c95c7c3f23..7c171f1bf4 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
@@ -100,7 +99,6 @@ public class MailboxService {
    * not open the underlying channel or acquire any additional resources. 
Instead, it will initialize lazily when the
    * data is sent for the first time.
    */
-  @VisibleForTesting
   public SendingMailbox getSendingMailbox(String hostname, int port, String 
mailboxId, long deadlineMs) {
     if (_hostname.equals(hostname) && _port == port) {
       return new InMemorySendingMailbox(mailboxId, this, deadlineMs);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index edcd0a0fdc..b66c71de48 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -36,9 +36,12 @@ import 
org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import 
org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
@@ -144,34 +147,47 @@ public class QueryRunner {
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
 
     // run pre-stage execution for all pipeline breakers
-    PipelineBreakerResult pipelineBreakerResult = 
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
-        _mailboxService, distributedStagePlan, deadlineMs, requestId, 
isTraceEnabled);
+    PipelineBreakerResult pipelineBreakerResult =
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan, deadlineMs,
+            requestId, isTraceEnabled);
+
+    // Send error block to all the receivers if pipeline breaker fails
+    if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() 
!= null) {
+      TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
+      LOGGER.error("Error executing pipeline breaker for request: {}, stage: 
{}, sending error block: {}", requestId,
+          distributedStagePlan.getStageId(), 
errorBlock.getDataBlock().getExceptions());
+      int receiverStageId = ((MailboxSendNode) 
distributedStagePlan.getStageRoot()).getReceiverStageId();
+      MailboxMetadata mailboxMetadata = 
distributedStagePlan.getStageMetadata().getWorkerMetadataList()
+          
.get(distributedStagePlan.getServer().workerId()).getMailBoxInfosMap().get(receiverStageId);
+      List<String> mailboxIds = MailboxIdUtils.toMailboxIds(requestId, 
mailboxMetadata);
+      for (int i = 0; i < mailboxIds.size(); i++) {
+        try {
+          
_mailboxService.getSendingMailbox(mailboxMetadata.getVirtualAddress(i).hostname(),
+              mailboxMetadata.getVirtualAddress(i).port(), mailboxIds.get(i), 
deadlineMs).send(errorBlock);
+        } catch (TimeoutException e) {
+          LOGGER.warn("Timed out sending error block to mailbox: {} for 
request: {}, stage: {}", mailboxIds.get(i),
+              requestId, distributedStagePlan.getStageId(), e);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception sending error block to mailbox: {} 
for request: {}, stage: {}",
+              mailboxIds.get(i), requestId, distributedStagePlan.getStageId(), 
e);
+        }
+      }
+      return;
+    }
 
     // Set Join Overflow configs to StageMetadata from request
     setJoinOverflowConfigs(distributedStagePlan, requestMetadataMap);
 
     // run OpChain
+    OpChain opChain;
     if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
-      try {
-        OpChain rootOperator = compileLeafStage(requestId, 
distributedStagePlan, requestMetadataMap,
-            pipelineBreakerResult, deadlineMs, isTraceEnabled);
-        _scheduler.register(rootOperator);
-      } catch (Exception e) {
-        LOGGER.error("Error executing leaf stage for: {}:{}", requestId, 
distributedStagePlan.getStageId(), e);
-        _scheduler.cancel(requestId);
-        throw e;
-      }
+      opChain = compileLeafStage(requestId, distributedStagePlan, 
requestMetadataMap, pipelineBreakerResult, deadlineMs,
+          isTraceEnabled);
     } else {
-      try {
-        OpChain rootOperator = compileIntermediateStage(requestId, 
distributedStagePlan, requestMetadataMap,
-            pipelineBreakerResult, deadlineMs, isTraceEnabled);
-        _scheduler.register(rootOperator);
-      } catch (Exception e) {
-        LOGGER.error("Error executing intermediate stage for: {}:{}", 
requestId, distributedStagePlan.getStageId(), e);
-        _scheduler.cancel(requestId);
-        throw e;
-      }
+      opChain = compileIntermediateStage(requestId, distributedStagePlan, 
requestMetadataMap, pipelineBreakerResult,
+          deadlineMs, isTraceEnabled);
     }
+    _scheduler.register(opChain);
   }
 
   private void setJoinOverflowConfigs(DistributedStagePlan 
distributedStagePlan,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 6149b758fe..69663f5d33 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -18,13 +18,13 @@
  */
 package org.apache.pinot.query.runtime.plan.pipeline;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
@@ -45,11 +45,11 @@ import org.slf4j.LoggerFactory;
  * Utility class to run pipeline breaker execution and collects the results.
  */
 public class PipelineBreakerExecutor {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipelineBreakerExecutor.class);
   private PipelineBreakerExecutor() {
-    // do not instantiate.
   }
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipelineBreakerExecutor.class);
+
   /**
    * Execute a pipeline breaker and collect the results (synchronously). 
Currently, pipeline breaker executor can only
    *    execute mailbox receive pipeline breaker.
@@ -64,10 +64,10 @@ public class PipelineBreakerExecutor {
    *   - If exception occurs, exception block will be wrapped in {@link 
TransferableBlock} and assigned to each PB node.
    *   - Normal stats will be attached to each PB node and downstream 
execution should return with stats attached.
    */
-  public static PipelineBreakerResult executePipelineBreakers(
-      OpChainSchedulerService scheduler,
-      MailboxService mailboxService, DistributedStagePlan 
distributedStagePlan, long deadlineMs,
-      long requestId, boolean isTraceEnabled) {
+  @Nullable
+  public static PipelineBreakerResult 
executePipelineBreakers(OpChainSchedulerService scheduler,
+      MailboxService mailboxService, DistributedStagePlan 
distributedStagePlan, long deadlineMs, long requestId,
+      boolean isTraceEnabled) {
     PipelineBreakerContext pipelineBreakerContext = new 
PipelineBreakerContext();
     PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), 
pipelineBreakerContext);
     if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
@@ -76,31 +76,25 @@ public class PipelineBreakerExecutor {
         // TODO: This PlanRequestContext needs to indicate it is a pre-stage 
opChain and only listens to pre-stage
         //     OpChain receive-mail callbacks.
         // see also: MailboxIdUtils TODOs, de-couple mailbox id from query 
information
-        OpChainExecutionContext opChainContext = new 
OpChainExecutionContext(mailboxService, requestId,
-            stageRoot.getPlanFragmentId(), distributedStagePlan.getServer(), 
deadlineMs,
-            distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
+        OpChainExecutionContext opChainContext =
+            new OpChainExecutionContext(mailboxService, requestId, 
stageRoot.getPlanFragmentId(),
+                distributedStagePlan.getServer(), deadlineMs, 
distributedStagePlan.getStageMetadata(), null,
+                isTraceEnabled);
         PhysicalPlanContext physicalPlanContext = new 
PhysicalPlanContext(opChainContext, null);
         return PipelineBreakerExecutor.execute(scheduler, 
pipelineBreakerContext, physicalPlanContext);
       } catch (Exception e) {
-        LOGGER.error("Unable to create pipeline breaker results for Req: " + 
requestId + ", Stage: "
-            + distributedStagePlan.getStageId(), e);
-        // Create all error blocks for all pipeline breaker nodes.
-        TransferableBlock errorBlock = 
TransferableBlockUtils.getErrorTransferableBlock(e);
-        Map<Integer, List<TransferableBlock>> resultMap = new HashMap<>();
-        for (int key : pipelineBreakerContext.getNodeIdMap().values()) {
-          if (pipelineBreakerContext.getPipelineBreakerMap().containsKey(key)) 
{
-            resultMap.put(key, Collections.singletonList(errorBlock));
-          }
-        }
-        return new 
PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap, null);
+        LOGGER.error("Caught exception executing pipeline breaker for request: 
{}, stage: {}", requestId,
+            distributedStagePlan.getStageId(), e);
+        return new 
PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), 
Collections.emptyMap(),
+            TransferableBlockUtils.getErrorTransferableBlock(e), null);
       }
     } else {
       return null;
     }
   }
 
-  private static PipelineBreakerResult execute(OpChainSchedulerService 
scheduler,
-      PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext)
+  private static PipelineBreakerResult execute(OpChainSchedulerService 
scheduler, PipelineBreakerContext context,
+      PhysicalPlanContext physicalPlanContext)
       throws Exception {
     Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new 
HashMap<>();
     for (Map.Entry<Integer, PlanNode> e : 
context.getPipelineBreakerMap().entrySet()) {
@@ -119,18 +113,20 @@ public class PipelineBreakerExecutor {
       PipelineBreakerContext context, Map<Integer, 
Operator<TransferableBlock>> pipelineWorkerMap,
       PhysicalPlanContext physicalPlanContext)
       throws Exception {
-    PipelineBreakerOperator pipelineBreakerOperator = new 
PipelineBreakerOperator(
-        physicalPlanContext.getOpChainExecutionContext(), pipelineWorkerMap);
+    PipelineBreakerOperator pipelineBreakerOperator =
+        new 
PipelineBreakerOperator(physicalPlanContext.getOpChainExecutionContext(), 
pipelineWorkerMap);
     CountDownLatch latch = new CountDownLatch(1);
-    OpChain pipelineBreakerOpChain = new 
OpChain(physicalPlanContext.getOpChainExecutionContext(),
-        pipelineBreakerOperator, physicalPlanContext.getReceivingMailboxIds(), 
(id) -> latch.countDown());
+    OpChain pipelineBreakerOpChain =
+        new OpChain(physicalPlanContext.getOpChainExecutionContext(), 
pipelineBreakerOperator,
+            physicalPlanContext.getReceivingMailboxIds(), (id) -> 
latch.countDown());
     scheduler.register(pipelineBreakerOpChain);
     long timeoutMs = physicalPlanContext.getDeadlineMs() - 
System.currentTimeMillis();
     if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
       return new PipelineBreakerResult(context.getNodeIdMap(), 
pipelineBreakerOperator.getResultMap(),
-          pipelineBreakerOpChain.getStats());
+          pipelineBreakerOperator.getErrorBlock(), 
pipelineBreakerOpChain.getStats());
     } else {
-      throw new IOException("Exception occur when awaiting breaker results!");
+      throw new TimeoutException(
+          String.format("Timed out waiting for pipeline breaker results after: 
%dms", timeoutMs));
     }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 1aab124135..9fe2588827 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -18,41 +18,34 @@
  */
 package org.apache.pinot.query.runtime.plan.pipeline;
 
-import com.google.common.collect.ImmutableSet;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import javax.annotation.Nullable;
-import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 class PipelineBreakerOperator extends MultiStageOperator {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipelineBreakerOperator.class);
   private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
-  private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> 
_workerEntries;
-  private final Map<Integer, List<TransferableBlock>> _resultMap;
-  private final ImmutableSet<Integer> _expectedKeySet;
-  private TransferableBlock _finalBlock;
 
-  public PipelineBreakerOperator(OpChainExecutionContext context,
-      Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap) {
+  private final Map<Integer, Operator<TransferableBlock>> _workerMap;
+
+  private Map<Integer, List<TransferableBlock>> _resultMap;
+  private TransferableBlock _errorBlock;
+
+  public PipelineBreakerOperator(OpChainExecutionContext context, Map<Integer, 
Operator<TransferableBlock>> workerMap) {
     super(context);
+    _workerMap = workerMap;
     _resultMap = new HashMap<>();
-    _expectedKeySet = ImmutableSet.copyOf(pipelineWorkerMap.keySet());
-    _workerEntries = new ArrayDeque<>();
-    _workerEntries.addAll(pipelineWorkerMap.entrySet());
-    for (int workerKey : _expectedKeySet) {
+    for (int workerKey : workerMap.keySet()) {
       _resultMap.put(workerKey, new ArrayList<>());
     }
   }
@@ -62,6 +55,10 @@ class PipelineBreakerOperator extends MultiStageOperator {
   }
 
   @Nullable
+  public TransferableBlock getErrorBlock() {
+    return _errorBlock;
+  }
+
   @Override
   public String toExplainString() {
     return EXPLAIN_NAME;
@@ -69,63 +66,44 @@ class PipelineBreakerOperator extends MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    // Poll from every mailbox operator:
-    // - Return the first content block
-    // - If no content block found but there are mailboxes not finished, try 
again
-    // - If all content blocks are already returned, return end-of-stream block
-    while (!_workerEntries.isEmpty()) {
-      if (_finalBlock != null) {
-        return _finalBlock;
-      }
-      if (System.currentTimeMillis() > _context.getDeadlineMs()) {
-        _finalBlock = 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-        constructErrorResponse(_finalBlock);
-        return _finalBlock;
-      }
-
-      Map.Entry<Integer, Operator<TransferableBlock>> worker = 
_workerEntries.getLast();
-      TransferableBlock block = worker.getValue().nextBlock();
-
-      if (block == null) {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug("==[PB]== Null block on " + _context.getId() + " worker 
" + worker.getKey());
+    if (_errorBlock != null) {
+      return _errorBlock;
+    }
+    // NOTE: Put an empty list for each worker in case there is no data block 
returned from that worker
+    if (_workerMap.size() == 1) {
+      Map.Entry<Integer, Operator<TransferableBlock>> entry = 
_workerMap.entrySet().iterator().next();
+      List<TransferableBlock> dataBlocks = new ArrayList<>();
+      _resultMap = Collections.singletonMap(entry.getKey(), dataBlocks);
+      Operator<TransferableBlock> operator = entry.getValue();
+      TransferableBlock block = operator.nextBlock();
+      while (!block.isSuccessfulEndOfStreamBlock()) {
+        if (block.isErrorBlock()) {
+          _errorBlock = block;
+          return block;
         }
-        continue;
+        dataBlocks.add(block);
+        block = operator.nextBlock();
       }
-
-      // Release the mailbox worker when the block is end-of-stream
-      if (block.isSuccessfulEndOfStreamBlock()) {
-        _workerEntries.removeLast();
-        continue;
+    } else {
+      _resultMap = new HashMap<>();
+      for (int workerKey : _workerMap.keySet()) {
+        _resultMap.put(workerKey, new ArrayList<>());
       }
-
-      if (block.isErrorBlock()) {
-        _finalBlock = block;
-      }
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("==[PB]== Returned block from : " + _context.getId() + " 
block: " + block);
-      }
-      _resultMap.get(worker.getKey()).add(block);
-      return block;
-    }
-    if (System.currentTimeMillis() > _context.getDeadlineMs()) {
-      _finalBlock = 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-      return _finalBlock;
-    } else if (_finalBlock == null) {
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("==[PB]== Finished : " + _context.getId());
+      // Keep polling from every operator in round-robin fashion
+      Queue<Map.Entry<Integer, Operator<TransferableBlock>>> entries = new 
ArrayDeque<>(_workerMap.entrySet());
+      while (!entries.isEmpty()) {
+        Map.Entry<Integer, Operator<TransferableBlock>> entry = entries.poll();
+        TransferableBlock block = entry.getValue().nextBlock();
+        if (block.isErrorBlock()) {
+          _errorBlock = block;
+          return block;
+        }
+        if (block.isDataBlock()) {
+          _resultMap.get(entry.getKey()).add(block);
+          entries.offer(entry);
+        }
       }
-      _finalBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock();
-    }
-    return _finalBlock;
-  }
-
-  /**
-   * Setting all result map to error if any of the pipeline breaker returns an 
ERROR.
-   */
-  private void constructErrorResponse(TransferableBlock errorBlock) {
-    for (int key : _expectedKeySet) {
-      _resultMap.put(key, Collections.singletonList(errorBlock));
     }
+    return TransferableBlockUtils.getEndOfStreamTransferableBlock();
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
index fe0f2cba61..2e2b003e34 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
@@ -32,12 +32,14 @@ import org.apache.pinot.query.runtime.operator.OpChainStats;
 public class PipelineBreakerResult {
   private final Map<PlanNode, Integer> _nodeIdMap;
   private final Map<Integer, List<TransferableBlock>> _resultMap;
+  private final TransferableBlock _errorBlock;
   private final OpChainStats _opChainStats;
 
   public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, 
List<TransferableBlock>> resultMap,
-      OpChainStats opChainStats) {
+      @Nullable TransferableBlock errorBlock, @Nullable OpChainStats 
opChainStats) {
     _nodeIdMap = nodeIdMap;
     _resultMap = resultMap;
+    _errorBlock = errorBlock;
     _opChainStats = opChainStats;
   }
 
@@ -49,6 +51,11 @@ public class PipelineBreakerResult {
     return _resultMap;
   }
 
+  @Nullable
+  public TransferableBlock getErrorBlock() {
+    return _errorBlock;
+  }
+
   @Nullable
   public OpChainStats getOpChainStats() {
     return _opChainStats;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 0d94a4d099..50ed9f7cf5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.service.dispatch;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import io.grpc.Deadline;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -211,8 +212,11 @@ public class QueryDispatcher {
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(scheduler, 
mailboxService, reducerStagePlan,
             System.currentTimeMillis() + timeoutMs, requestId, traceEnabled);
-    if (pipelineBreakerResult == null) {
-      throw new RuntimeException("Broker reducer error during query 
execution!");
+    Preconditions.checkState(pipelineBreakerResult != null, "Pipeline breaker 
result should not be null");
+    if (pipelineBreakerResult.getErrorBlock() != null) {
+      throw new RuntimeException(
+          "Received error query execution result block: " + 
pipelineBreakerResult.getErrorBlock().getDataBlock()
+              .getExceptions());
     }
     collectStats(dispatchableSubPlan, pipelineBreakerResult.getOpChainStats(), 
statsAggregatorMap);
     List<TransferableBlock> resultDataBlocks = 
pipelineBreakerResult.getResultMap().get(0);
@@ -245,10 +249,6 @@ public class QueryDispatcher {
     List<Object[]> resultRows = new ArrayList<>();
     DataSchema resultSchema = toResultSchema(sourceSchema, fields);
     for (TransferableBlock transferableBlock : queryResult) {
-      if (transferableBlock.isErrorBlock()) {
-        throw new RuntimeException(
-            "Received error query execution result block: " + 
transferableBlock.getDataBlock().getExceptions());
-      }
       DataBlock dataBlock = transferableBlock.getDataBlock();
       int numColumns = resultSchema.getColumnNames().length;
       int numRows = dataBlock.getNumberOfRows();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 7ee9ed8732..3c73434c15 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -19,14 +19,15 @@
 package org.apache.pinot.query.runtime;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -44,6 +45,8 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -188,12 +191,18 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
    */
   @Test(dataProvider = "testDataWithSqlExecutionExceptions")
   public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
-    long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
-    DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
-    Map<String, String> requestMetadataMap =
-        ImmutableMap.of(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
String.valueOf(requestId),
-            CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
-            String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+    long requestId = REQUEST_ID_GEN.getAndIncrement();
+    SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sql);
+    QueryEnvironment.QueryPlannerResult queryPlannerResult =
+        _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId);
+    DispatchableSubPlan dispatchableSubPlan = 
queryPlannerResult.getQueryPlan();
+    Map<String, String> requestMetadataMap = new HashMap<>();
+    
requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
String.valueOf(requestId));
+    Long timeoutMs = 
QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
+    
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
+        String.valueOf(timeoutMs != null ? timeoutMs : 
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+    
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
 "true");
+    requestMetadataMap.putAll(sqlNodeAndOptions.getOptions());
     int reducerStageId = -1;
     for (int stageId = 0; stageId < 
dispatchableSubPlan.getQueryStageList().size(); stageId++) {
       if 
(dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment()
@@ -210,17 +219,16 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
           
Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)),
           _mailboxService,
           _reducerScheduler, null, false);
-    } catch (RuntimeException rte) {
-      Assert.assertTrue(rte.getMessage().contains("Received error query 
execution result block"));
-      // TODO: The actual message is (usually) something like:
-      //  Received error query execution result block: 
{200=QueryExecutionError:
-      //   java.lang.IllegalArgumentException: Illegal Json Path: $['path'] 
does not match document
-      //     at 
org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.throwPathNotFoundException(...)
-      //     at 
org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.processValue(...)
-      //     at 
org.apache.pinot.core.common.evaluators.DefaultJsonPathEvaluator.evaluateBlock(...)
-      //     at 
org.apache.pinot.core.common.DataFetcher$ColumnValueReader.readIntValues(DataFetcher.java:489)}
-      Assert.assertTrue(rte.getMessage().contains(exceptionMsg), "Exception 
should contain: " + exceptionMsg
-          + "! but found: " + rte.getMessage());
+      Assert.fail("Should have thrown exception!");
+    } catch (RuntimeException e) {
+      // NOTE: The actual message is (usually) something like:
+      //   Received error query execution result block: 
{200=QueryExecutionError:
+      //   Query execution error on: Server_localhost_12345
+      //     java.lang.IllegalArgumentException: Illegal Json Path: $['path'] 
does not match document
+      String exceptionMessage = e.getMessage();
+      Assert.assertTrue(exceptionMessage.startsWith("Received error query 
execution result block: "));
+      Assert.assertTrue(exceptionMessage.contains(exceptionMsg),
+          "Exception should contain: " + exceptionMsg + ", but found: " + 
exceptionMessage);
     }
   }
 
@@ -288,7 +296,7 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         // Timeout exception should occur with this option:
         new Object[]{
             "SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN 
c ON a.col1 = c.col1",
-            "timeout"
+            "Timeout"
         },
 
         // Function with incorrect argument signature should throw runtime 
exception when casting string to numeric
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index d8589c42a0..7dcf8bb759 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -37,14 +37,15 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryServerEnclosure;
@@ -87,7 +88,7 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
   protected static final String SEGMENT_BREAKER_KEY = 
"__SEGMENT_BREAKER_KEY__";
   protected static final String SEGMENT_BREAKER_STR = "------";
   protected static final GenericRow SEGMENT_BREAKER_ROW = new GenericRow();
-  protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
+  protected static final AtomicLong REQUEST_ID_GEN = new AtomicLong();
   protected QueryEnvironment _queryEnvironment;
   protected String _reducerHostname;
   protected int _reducerGrpcPort;
@@ -108,15 +109,16 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
    * ser/de dispatches.
    */
   protected List<Object[]> queryRunner(String sql, Map<Integer, 
ExecutionStatsAggregator> executionStatsAggregatorMap) {
-    long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+    long requestId = REQUEST_ID_GEN.getAndIncrement();
     SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sql);
     QueryEnvironment.QueryPlannerResult queryPlannerResult =
         _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId);
     DispatchableSubPlan dispatchableSubPlan = 
queryPlannerResult.getQueryPlan();
     Map<String, String> requestMetadataMap = new HashMap<>();
     
requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
String.valueOf(requestId));
+    Long timeoutMs = 
QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
     
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
-        String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+        String.valueOf(timeoutMs != null ? timeoutMs : 
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
     
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
 "true");
     requestMetadataMap.putAll(sqlNodeAndOptions.getOptions());
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 7f33dc45a4..b5ba82e48d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -136,6 +136,7 @@ public class PipelineBreakerExecutorTest {
     // then
     // should have single PB result, receive 2 data blocks, EOS block 
shouldn't be included
     Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertNull(pipelineBreakerResult.getErrorBlock());
     Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
     
Assert.assertEquals(pipelineBreakerResult.getResultMap().values().iterator().next().size(),
 2);
 
@@ -176,6 +177,7 @@ public class PipelineBreakerExecutorTest {
     // then
     // should have two PB result, receive 2 data blocks, one each, EOS block 
shouldn't be included
     Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertNull(pipelineBreakerResult.getErrorBlock());
     Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
     Iterator<List<TransferableBlock>> it = 
pipelineBreakerResult.getResultMap().values().iterator();
     Assert.assertEquals(it.next().size(), 1);
@@ -201,8 +203,9 @@ public class PipelineBreakerExecutorTest {
             System.currentTimeMillis() + 10_000L, 0, false);
 
     // then
-    // should contain only failure error blocks
+    // should return empty block list
     Assert.assertNotNull(pipelineBreakerResult);
+    Assert.assertNull(pipelineBreakerResult.getErrorBlock());
     Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
     List<TransferableBlock> resultBlocks = 
pipelineBreakerResult.getResultMap().values().iterator().next();
     Assert.assertEquals(resultBlocks.size(), 0);
@@ -233,11 +236,9 @@ public class PipelineBreakerExecutorTest {
     // then
     // should contain only failure error blocks
     Assert.assertNotNull(pipelineBreakerResult);
-    Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
-    List<TransferableBlock> resultBlocks = 
pipelineBreakerResult.getResultMap().values().iterator().next();
-    Assert.assertEquals(resultBlocks.size(), 1);
-    Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
-    Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+    TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
+    Assert.assertNotNull(errorBlock);
+    Assert.assertTrue(errorBlock.isErrorBlock());
   }
 
   @Test
@@ -311,17 +312,8 @@ public class PipelineBreakerExecutorTest {
     // then
     // should fail even if one of the 2 PB doesn't contain error block from 
sender.
     Assert.assertNotNull(pipelineBreakerResult);
-    Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
-
-    boolean errorFound = false;
-    for (List<TransferableBlock> resultBlocks : 
pipelineBreakerResult.getResultMap().values()) {
-      if (!resultBlocks.isEmpty()) {
-        TransferableBlock lastBlock = resultBlocks.get(resultBlocks.size() - 
1);
-        if (lastBlock.isErrorBlock()) {
-          errorFound = true;
-        }
-      }
-    }
-    Assert.assertTrue(errorFound, "An error block should be the last block on 
at least one of the result map entries");
+    TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
+    Assert.assertNotNull(errorBlock);
+    Assert.assertTrue(errorBlock.isErrorBlock());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to