This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 399f033ec3 [Multi-stage] Remove PhysicalPlanContext and clean up 
executor logic (#11439)
399f033ec3 is described below

commit 399f033ec3917df2bc478b5904406a95e0bc7258
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Aug 25 17:24:47 2023 -0700

    [Multi-stage] Remove PhysicalPlanContext and clean up executor logic 
(#11439)
---
 .../MultiStageBrokerRequestHandler.java            |   8 +-
 .../apache/pinot/query/runtime/QueryRunner.java    | 114 ++++++++-------------
 .../pinot/query/runtime/operator/OpChain.java      |  35 +++----
 .../runtime/plan/OpChainExecutionContext.java      |  26 +++--
 .../query/runtime/plan/PhysicalPlanContext.java    |  95 -----------------
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  90 +++++++---------
 .../plan/pipeline/PipelineBreakerExecutor.java     |  47 ++++-----
 .../plan/server/ServerPlanRequestContext.java      |  13 +--
 .../plan/server/ServerPlanRequestUtils.java        |  58 +++++------
 .../plan/server/ServerPlanRequestVisitor.java      |   2 +-
 .../query/service/dispatch/QueryDispatcher.java    |  16 +--
 .../pinot/query/service/server/QueryServer.java    |  13 ++-
 .../pinot/query/runtime/QueryRunnerTest.java       |   4 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |   4 +-
 .../executor/OpChainSchedulerServiceTest.java      |  11 +-
 .../runtime/operator/LiteralValueOperatorTest.java |   4 -
 .../operator/MailboxReceiveOperatorTest.java       |   4 +-
 .../runtime/operator/MailboxSendOperatorTest.java  |   4 +-
 .../pinot/query/runtime/operator/OpChainTest.java  |  35 ++++---
 .../query/runtime/operator/OperatorTestUtil.java   |  17 ++-
 .../operator/SortedMailboxReceiveOperatorTest.java |   5 +-
 .../plan/pipeline/PipelineBreakerExecutorTest.java |  29 +++---
 .../service/dispatch/QueryDispatcherTest.java      |   4 +-
 23 files changed, 247 insertions(+), 391 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 3a65691073..2080bd4a64 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -166,8 +166,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
 errorMessage));
     }
 
-    boolean traceEnabled = Boolean.parseBoolean(
-        
sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE,
 "false"));
+    Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
+    boolean traceEnabled = 
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
 
     ResultTable queryResults;
     Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
@@ -177,8 +177,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     long executionStartTimeNs = System.nanoTime();
     try {
-      queryResults = _queryDispatcher.submitAndReduce(requestContext, 
dispatchableSubPlan, queryTimeoutMs,
-          sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
+      queryResults = _queryDispatcher.submitAndReduce(requestContext, 
dispatchableSubPlan, queryTimeoutMs, queryOptions,
+          stageIdStatsMap);
     } catch (Throwable t) {
       String consolidatedMessage = 
ExceptionUtils.consolidateExceptionMessages(t);
       LOGGER.error("Caught exception executing request {}: {}, {}", requestId, 
query, consolidatedMessage);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index f48f5561fb..021af52f80 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -20,15 +20,12 @@ package org.apache.pinot.query.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -39,7 +36,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.MailboxMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
@@ -50,14 +46,12 @@ import 
org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
@@ -72,17 +66,13 @@ public class QueryRunner {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryRunner.class);
   private static final String PINOT_V1_SERVER_QUERY_CONFIG_PREFIX = 
"pinot.server.query.executor";
 
-  // This is a temporary before merging the 2 type of executor.
-  private ServerQueryExecutorV1Impl _serverExecutor;
   private HelixManager _helixManager;
-  private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
-  private MailboxService _mailboxService;
-  private String _hostname;
-  private int _port;
+  private ServerMetrics _serverMetrics;
 
   private ExecutorService _opChainExecutor;
-
   private OpChainSchedulerService _scheduler;
+  private MailboxService _mailboxService;
+  private ServerQueryExecutorV1Impl _leafQueryExecutor;
 
   // Group-by settings
   @Nullable
@@ -102,12 +92,14 @@ public class QueryRunner {
    */
   public void init(PinotConfiguration config, InstanceDataManager 
instanceDataManager, HelixManager helixManager,
       ServerMetrics serverMetrics) {
+    _helixManager = helixManager;
+    _serverMetrics = serverMetrics;
+
     String instanceName = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
-    _hostname = 
instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? 
instanceName.substring(
+    String hostname = 
instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? 
instanceName.substring(
         CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
-    _port = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+    int port = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
         CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
-    _helixManager = helixManager;
 
     // TODO: Consider using separate config for intermediate stage and leaf 
stage
     String numGroupsLimitStr = 
config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
@@ -121,29 +113,28 @@ public class QueryRunner {
     String joinOverflowModeStr = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
     _joinOverflowMode = joinOverflowModeStr != null ? 
JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
 
+    //TODO: make this configurable
+    _opChainExecutor =
+        ExecutorServiceUtils.create(config, "pinot.query.runner.opchain", 
"op_chain_worker_on_" + port + "_port");
+    _scheduler = new OpChainSchedulerService(getOpChainExecutorService());
+    _mailboxService = new MailboxService(hostname, port, config);
     try {
-      //TODO: make this configurable
-      _opChainExecutor = ExecutorServiceUtils.create(config, 
"pinot.query.runner.opchain",
-          "op_chain_worker_on_" + _port + "_port");
-      _scheduler = new OpChainSchedulerService(getOpChainExecutorService());
-      _mailboxService = new MailboxService(_hostname, _port, config);
-      _serverExecutor = new ServerQueryExecutorV1Impl();
-      _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), 
instanceDataManager, serverMetrics);
+      _leafQueryExecutor = new ServerQueryExecutorV1Impl();
+      
_leafQueryExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), 
instanceDataManager, serverMetrics);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
+
+    LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", 
hostname, port);
   }
 
-  public void start()
-      throws TimeoutException {
-    _helixPropertyStore = _helixManager.getHelixPropertyStore();
+  public void start() {
     _mailboxService.start();
-    _serverExecutor.start();
+    _leafQueryExecutor.start();
   }
 
-  public void shutDown()
-      throws TimeoutException {
-    _serverExecutor.shutDown();
+  public void shutDown() {
+    _leafQueryExecutor.shutDown();
     _mailboxService.shutdown();
     ExecutorServiceUtils.close(_opChainExecutor);
   }
@@ -156,17 +147,15 @@ public class QueryRunner {
    */
   public void processQuery(DistributedStagePlan distributedStagePlan, 
Map<String, String> requestMetadata) {
     long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
-    long timeoutMs = 
Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));
-    boolean isTraceEnabled =
-        
Boolean.parseBoolean(requestMetadata.getOrDefault(CommonConstants.Broker.Request.TRACE,
 "false"));
+    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
 
     
setStageCustomProperties(distributedStagePlan.getStageMetadata().getCustomProperties(),
 requestMetadata);
 
     // run pre-stage execution for all pipeline breakers
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan, deadlineMs,
-            requestId, isTraceEnabled);
+        PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan,
+            requestMetadata, requestId, deadlineMs);
 
     // Send error block to all the receivers if pipeline breaker fails
     if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() 
!= null) {
@@ -193,13 +182,15 @@ public class QueryRunner {
     }
 
     // run OpChain
+    OpChainExecutionContext executionContext =
+        new OpChainExecutionContext(_mailboxService, requestId, 
distributedStagePlan.getStageId(),
+            distributedStagePlan.getServer(), deadlineMs, requestMetadata, 
distributedStagePlan.getStageMetadata(),
+            pipelineBreakerResult);
     OpChain opChain;
     if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
-      opChain = compileLeafStage(requestId, distributedStagePlan, 
requestMetadata, pipelineBreakerResult, deadlineMs,
-          isTraceEnabled);
+      opChain = compileLeafStage(executionContext, distributedStagePlan);
     } else {
-      opChain = compileIntermediateStage(requestId, distributedStagePlan, 
requestMetadata, pipelineBreakerResult,
-          deadlineMs, isTraceEnabled);
+      opChain = 
PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), 
executionContext);
     }
     _scheduler.register(opChain);
   }
@@ -248,51 +239,36 @@ public class QueryRunner {
     return _opChainExecutor;
   }
 
-  private OpChain compileIntermediateStage(long requestId, 
DistributedStagePlan distributedStagePlan,
-      Map<String, String> requestMetadataMap, PipelineBreakerResult 
pipelineBreakerResult, long deadlineMs,
-      boolean isTraceEnabled) {
-    PlanNode stageRoot = distributedStagePlan.getStageRoot();
-    OpChainExecutionContext opChainContext = new 
OpChainExecutionContext(_mailboxService, requestId,
-        stageRoot.getPlanFragmentId(), distributedStagePlan.getServer(), 
deadlineMs,
-        distributedStagePlan.getStageMetadata(), pipelineBreakerResult, 
isTraceEnabled);
-    return PhysicalPlanVisitor.walkPlanNode(stageRoot,
-        new PhysicalPlanContext(opChainContext, pipelineBreakerResult));
-  }
-
-  private OpChain compileLeafStage(long requestId, DistributedStagePlan 
distributedStagePlan,
-      Map<String, String> requestMetadataMap, PipelineBreakerResult 
pipelineBreakerResult, long deadlineMs,
-      boolean isTraceEnabled) {
-    OpChainExecutionContext opChainContext = new 
OpChainExecutionContext(_mailboxService, requestId,
-        distributedStagePlan.getStageId(), distributedStagePlan.getServer(), 
deadlineMs,
-        distributedStagePlan.getStageMetadata(), pipelineBreakerResult, 
isTraceEnabled);
-    PhysicalPlanContext planContext = new PhysicalPlanContext(opChainContext, 
pipelineBreakerResult);
-    List<ServerPlanRequestContext> serverPlanRequestContexts = 
ServerPlanRequestUtils.constructServerQueryRequests(
-        planContext, distributedStagePlan, requestMetadataMap, 
_helixPropertyStore);
+  private OpChain compileLeafStage(OpChainExecutionContext executionContext,
+      DistributedStagePlan distributedStagePlan) {
+    List<ServerPlanRequestContext> serverPlanRequestContexts =
+        ServerPlanRequestUtils.constructServerQueryRequests(executionContext, 
distributedStagePlan,
+            _helixManager.getHelixPropertyStore());
     List<ServerQueryRequest> serverQueryRequests = new 
ArrayList<>(serverPlanRequestContexts.size());
+    long queryArrivalTimeMs = System.currentTimeMillis();
     for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
-      serverQueryRequests.add(new 
ServerQueryRequest(requestContext.getInstanceRequest(),
-          new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
System.currentTimeMillis()));
+      serverQueryRequests.add(
+          new ServerQueryRequest(requestContext.getInstanceRequest(), 
_serverMetrics, queryArrivalTimeMs));
     }
     MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
-    OpChainExecutionContext opChainExecutionContext = new 
OpChainExecutionContext(planContext);
     MultiStageOperator leafStageOperator =
-        new LeafStageTransferableBlockOperator(opChainExecutionContext, 
this::processServerQueryRequest,
-            serverQueryRequests, sendNode.getDataSchema());
+        new LeafStageTransferableBlockOperator(executionContext, 
this::processServerQueryRequest, serverQueryRequests,
+            sendNode.getDataSchema());
     MailboxSendOperator mailboxSendOperator =
-        new MailboxSendOperator(opChainExecutionContext, leafStageOperator, 
sendNode.getDistributionType(),
+        new MailboxSendOperator(executionContext, leafStageOperator, 
sendNode.getDistributionType(),
             sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), 
sendNode.getCollationDirections(),
             sendNode.isSortOnSender(), sendNode.getReceiverStageId());
-    return new OpChain(opChainExecutionContext, mailboxSendOperator, 
Collections.emptyList());
+    return new OpChain(executionContext, mailboxSendOperator);
   }
 
   private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest 
request) {
     InstanceResponseBlock result;
     try {
-      result = _serverExecutor.execute(request, getOpChainExecutorService());
+      result = _leafQueryExecutor.execute(request, 
getOpChainExecutorService());
     } catch (Exception e) {
       InstanceResponseBlock errorResponse = new InstanceResponseBlock();
-      
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
-          e.getMessage() + QueryException.getTruncatedStackTrace(e));
+      errorResponse.getExceptions()
+          .put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() + 
QueryException.getTruncatedStackTrace(e));
       result = errorResponse;
     }
     return result;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index d5414439ed..360bef6324 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
-import java.util.List;
 import java.util.function.Consumer;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -34,31 +33,21 @@ import org.slf4j.LoggerFactory;
 public class OpChain implements AutoCloseable {
   private static final Logger LOGGER = LoggerFactory.getLogger(OpChain.class);
 
-  private final MultiStageOperator _root;
-  private final List<String> _receivingMailboxIds;
   private final OpChainId _id;
   private final OpChainStats _stats;
-  private final Consumer<OpChainId> _opChainFinishCallback;
+  private final MultiStageOperator _root;
+  private final Consumer<OpChainId> _finishCallback;
 
-  public OpChain(OpChainExecutionContext context, MultiStageOperator root, 
List<String> receivingMailboxIds) {
-    this(context, root, receivingMailboxIds, (id) -> { });
+  public OpChain(OpChainExecutionContext context, MultiStageOperator root) {
+    this(context, root, (id) -> {
+    });
   }
 
-  public OpChain(OpChainExecutionContext context, MultiStageOperator root, 
List<String> receivingMailboxIds,
-      Consumer<OpChainId> opChainFinishCallback) {
-    _root = root;
-    _receivingMailboxIds = receivingMailboxIds;
+  public OpChain(OpChainExecutionContext context, MultiStageOperator root, 
Consumer<OpChainId> finishCallback) {
     _id = context.getId();
     _stats = context.getStats();
-    _opChainFinishCallback = opChainFinishCallback;
-  }
-
-  public Operator<TransferableBlock> getRoot() {
-    return _root;
-  }
-
-  public List<String> getReceivingMailboxIds() {
-    return _receivingMailboxIds;
+    _root = root;
+    _finishCallback = finishCallback;
   }
 
   public OpChainId getId() {
@@ -70,6 +59,10 @@ public class OpChain implements AutoCloseable {
     return _stats;
   }
 
+  public Operator<TransferableBlock> getRoot() {
+    return _root;
+  }
+
   @Override
   public String toString() {
     return "OpChain{" + _id + "}";
@@ -86,7 +79,7 @@ public class OpChain implements AutoCloseable {
     try {
       _root.close();
     } finally {
-      _opChainFinishCallback.accept(getId());
+      _finishCallback.accept(getId());
       LOGGER.trace("OpChain callback called");
     }
   }
@@ -102,7 +95,7 @@ public class OpChain implements AutoCloseable {
     try {
       _root.cancel(e);
     } finally {
-      _opChainFinishCallback.accept(getId());
+      _finishCallback.accept(getId());
       LOGGER.trace("OpChain callback called");
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 9963c91db1..cb4326e8b9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
+import java.util.Map;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.operator.OpChainStats;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
+import org.apache.pinot.spi.utils.CommonConstants;
 
 
 /**
@@ -36,32 +38,30 @@ public class OpChainExecutionContext {
   private final int _stageId;
   private final VirtualServerAddress _server;
   private final long _deadlineMs;
+  private final Map<String, String> _requestMetadata;
   private final StageMetadata _stageMetadata;
   private final OpChainId _id;
   private final OpChainStats _stats;
+  private final PipelineBreakerResult _pipelineBreakerResult;
   private final boolean _traceEnabled;
 
   public OpChainExecutionContext(MailboxService mailboxService, long 
requestId, int stageId,
-      VirtualServerAddress server, long deadlineMs, StageMetadata 
stageMetadata,
-      PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
+      VirtualServerAddress server, long deadlineMs, Map<String, String> 
requestMetadata, StageMetadata stageMetadata,
+      PipelineBreakerResult pipelineBreakerResult) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
     _server = server;
     _deadlineMs = deadlineMs;
+    _requestMetadata = requestMetadata;
     _stageMetadata = stageMetadata;
     _id = new OpChainId(requestId, server.workerId(), stageId);
     _stats = new OpChainStats(_id.toString());
+    _pipelineBreakerResult = pipelineBreakerResult;
     if (pipelineBreakerResult != null && 
pipelineBreakerResult.getOpChainStats() != null) {
       
_stats.getOperatorStatsMap().putAll(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap());
     }
-    _traceEnabled = traceEnabled;
-  }
-
-  public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
-    this(physicalPlanContext.getMailboxService(), 
physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
-        physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), 
physicalPlanContext.getStageMetadata(),
-        physicalPlanContext.getPipelineBreakerResult(), 
physicalPlanContext.isTraceEnabled());
+    _traceEnabled = 
Boolean.parseBoolean(requestMetadata.get(CommonConstants.Broker.Request.TRACE));
   }
 
   public MailboxService getMailboxService() {
@@ -84,6 +84,10 @@ public class OpChainExecutionContext {
     return _deadlineMs;
   }
 
+  public Map<String, String> getRequestMetadata() {
+    return _requestMetadata;
+  }
+
   public StageMetadata getStageMetadata() {
     return _stageMetadata;
   }
@@ -96,6 +100,10 @@ public class OpChainExecutionContext {
     return _stats;
   }
 
+  public PipelineBreakerResult getPipelineBreakerResult() {
+    return _pipelineBreakerResult;
+  }
+
   public boolean isTraceEnabled() {
     return _traceEnabled;
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
deleted file mode 100644
index 00b7ac40e7..0000000000
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.plan;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
-
-
-public class PhysicalPlanContext {
-  protected final MailboxService _mailboxService;
-  protected final long _requestId;
-  protected final int _stageId;
-  private final long _deadlineMs;
-  protected final VirtualServerAddress _server;
-  protected final StageMetadata _stageMetadata;
-  protected final PipelineBreakerResult _pipelineBreakerResult;
-  protected final List<String> _receivingMailboxIds = new ArrayList<>();
-  private final OpChainExecutionContext _opChainExecutionContext;
-  private final boolean _traceEnabled;
-
-  public PhysicalPlanContext(OpChainExecutionContext opChainContext, 
PipelineBreakerResult pipelineBreakerResult) {
-    _mailboxService = opChainContext.getMailboxService();
-    _requestId = opChainContext.getRequestId();
-    _stageId = opChainContext.getStageId();
-    _deadlineMs = opChainContext.getDeadlineMs();
-    _server = opChainContext.getServer();
-    _stageMetadata = opChainContext.getStageMetadata();
-    _pipelineBreakerResult = pipelineBreakerResult;
-    _traceEnabled = opChainContext.isTraceEnabled();
-    _opChainExecutionContext = opChainContext;
-  }
-
-  public long getRequestId() {
-    return _requestId;
-  }
-
-  public int getStageId() {
-    return _stageId;
-  }
-
-  public long getDeadlineMs() {
-    return _deadlineMs;
-  }
-
-  public VirtualServerAddress getServer() {
-    return _server;
-  }
-
-  public StageMetadata getStageMetadata() {
-    return _stageMetadata;
-  }
-
-  public PipelineBreakerResult getPipelineBreakerResult() {
-    return _pipelineBreakerResult;
-  }
-
-  public MailboxService getMailboxService() {
-    return _mailboxService;
-  }
-
-  public void addReceivingMailboxIds(List<String> receivingMailboxIds) {
-    _receivingMailboxIds.addAll(receivingMailboxIds);
-  }
-
-  public List<String> getReceivingMailboxIds() {
-    return _receivingMailboxIds;
-  }
-
-  public OpChainExecutionContext getOpChainExecutionContext() {
-    return _opChainExecutionContext;
-  }
-
-  public boolean isTraceEnabled() {
-    return _traceEnabled;
-  }
-}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 2340545437..144cf86e27 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -57,64 +57,56 @@ import 
org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
  * this works only for the intermediate stage nodes, leaf stage nodes are 
expected to compile into
  * v1 operators at this point in time.
  *
- * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, 
PhysicalPlanContext)}
+ * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, 
OpChainExecutionContext)}
  */
-public class PhysicalPlanVisitor implements 
PlanNodeVisitor<MultiStageOperator, PhysicalPlanContext> {
+public class PhysicalPlanVisitor implements 
PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> {
 
   private static final PhysicalPlanVisitor INSTANCE = new 
PhysicalPlanVisitor();
 
-  public static OpChain walkPlanNode(PlanNode node, PhysicalPlanContext 
context) {
+  public static OpChain walkPlanNode(PlanNode node, OpChainExecutionContext 
context) {
     MultiStageOperator root = node.visit(INSTANCE, context);
-    return new OpChain(context.getOpChainExecutionContext(), root, 
context.getReceivingMailboxIds());
+    return new OpChain(context, root);
   }
 
   @Override
-  public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, 
PhysicalPlanContext context) {
+  public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, 
OpChainExecutionContext context) {
     if (node.isSortOnReceiver()) {
-      SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
-          new 
SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), 
node.getDistributionType(),
-              node.getDataSchema(), node.getCollationKeys(), 
node.getCollationDirections(),
-              node.getCollationNullDirections(), node.isSortOnSender(), 
node.getSenderStageId());
-      
context.addReceivingMailboxIds(sortedMailboxReceiveOperator.getMailboxIds());
-      return sortedMailboxReceiveOperator;
+      return new SortedMailboxReceiveOperator(context, 
node.getDistributionType(), node.getDataSchema(),
+          node.getCollationKeys(), node.getCollationDirections(), 
node.getCollationNullDirections(),
+          node.isSortOnSender(), node.getSenderStageId());
     } else {
-      MailboxReceiveOperator mailboxReceiveOperator =
-          new MailboxReceiveOperator(context.getOpChainExecutionContext(), 
node.getDistributionType(),
-              node.getSenderStageId());
-      context.addReceivingMailboxIds(mailboxReceiveOperator.getMailboxIds());
-      return mailboxReceiveOperator;
+      return new MailboxReceiveOperator(context, node.getDistributionType(), 
node.getSenderStageId());
     }
   }
 
   @Override
-  public MultiStageOperator visitMailboxSend(MailboxSendNode node, 
PhysicalPlanContext context) {
+  public MultiStageOperator visitMailboxSend(MailboxSendNode node, 
OpChainExecutionContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
-    return new MailboxSendOperator(context.getOpChainExecutionContext(), 
nextOperator, node.getDistributionType(),
-        node.getPartitionKeySelector(), node.getCollationKeys(), 
node.getCollationDirections(), node.isSortOnSender(),
-        node.getReceiverStageId());
+    return new MailboxSendOperator(context, nextOperator, 
node.getDistributionType(), node.getPartitionKeySelector(),
+        node.getCollationKeys(), node.getCollationDirections(), 
node.isSortOnSender(), node.getReceiverStageId());
   }
 
   @Override
-  public MultiStageOperator visitAggregate(AggregateNode node, 
PhysicalPlanContext context) {
+  public MultiStageOperator visitAggregate(AggregateNode node, 
OpChainExecutionContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     DataSchema inputSchema = node.getInputs().get(0).getDataSchema();
     DataSchema resultSchema = node.getDataSchema();
 
-    return new AggregateOperator(context.getOpChainExecutionContext(), 
nextOperator, resultSchema, inputSchema,
-        node.getAggCalls(), node.getGroupSet(), node.getAggType(), 
node.getFilterArgIndices(), node.getNodeHint());
+    return new AggregateOperator(context, nextOperator, resultSchema, 
inputSchema, node.getAggCalls(),
+        node.getGroupSet(), node.getAggType(), node.getFilterArgIndices(), 
node.getNodeHint());
   }
 
   @Override
-  public MultiStageOperator visitWindow(WindowNode node, PhysicalPlanContext 
context) {
+  public MultiStageOperator visitWindow(WindowNode node, 
OpChainExecutionContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
-    return new WindowAggregateOperator(context.getOpChainExecutionContext(), 
nextOperator, node.getGroupSet(),
-        node.getOrderSet(), node.getOrderSetDirection(), 
node.getOrderSetNullDirection(), node.getAggCalls(),
-        node.getLowerBound(), node.getUpperBound(), node.getWindowFrameType(), 
node.getConstants(),
-        node.getDataSchema(), node.getInputs().get(0).getDataSchema());
+    return new WindowAggregateOperator(context, nextOperator, 
node.getGroupSet(), node.getOrderSet(),
+        node.getOrderSetDirection(), node.getOrderSetNullDirection(), 
node.getAggCalls(), node.getLowerBound(),
+        node.getUpperBound(), node.getWindowFrameType(), node.getConstants(), 
node.getDataSchema(),
+        node.getInputs().get(0).getDataSchema());
   }
 
   @Override
-  public MultiStageOperator visitSetOp(SetOpNode setOpNode, 
PhysicalPlanContext context) {
+  public MultiStageOperator visitSetOp(SetOpNode setOpNode, 
OpChainExecutionContext context) {
     List<MultiStageOperator> inputs = new ArrayList<>();
     for (PlanNode input : setOpNode.getInputs()) {
       MultiStageOperator visited = input.visit(this, context);
@@ -122,66 +114,60 @@ public class PhysicalPlanVisitor implements 
PlanNodeVisitor<MultiStageOperator,
     }
     switch (setOpNode.getSetOpType()) {
       case UNION:
-        return new UnionOperator(context.getOpChainExecutionContext(), inputs,
-            setOpNode.getInputs().get(0).getDataSchema());
+        return new UnionOperator(context, inputs, 
setOpNode.getInputs().get(0).getDataSchema());
       case INTERSECT:
-        return new IntersectOperator(context.getOpChainExecutionContext(), 
inputs,
-            setOpNode.getInputs().get(0).getDataSchema());
+        return new IntersectOperator(context, inputs, 
setOpNode.getInputs().get(0).getDataSchema());
       case MINUS:
-        return new MinusOperator(context.getOpChainExecutionContext(), inputs,
-            setOpNode.getInputs().get(0).getDataSchema());
+        return new MinusOperator(context, inputs, 
setOpNode.getInputs().get(0).getDataSchema());
       default:
         throw new IllegalStateException();
     }
   }
 
   @Override
-  public MultiStageOperator visitExchange(ExchangeNode exchangeNode, 
PhysicalPlanContext context) {
+  public MultiStageOperator visitExchange(ExchangeNode exchangeNode, 
OpChainExecutionContext context) {
     throw new UnsupportedOperationException("ExchangeNode should not be 
visited");
   }
 
   @Override
-  public MultiStageOperator visitFilter(FilterNode node, PhysicalPlanContext 
context) {
+  public MultiStageOperator visitFilter(FilterNode node, 
OpChainExecutionContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
-    return new FilterOperator(context.getOpChainExecutionContext(), 
nextOperator, node.getDataSchema(),
-        node.getCondition());
+    return new FilterOperator(context, nextOperator, node.getDataSchema(), 
node.getCondition());
   }
 
   @Override
-  public MultiStageOperator visitJoin(JoinNode node, PhysicalPlanContext 
context) {
+  public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext 
context) {
     PlanNode left = node.getInputs().get(0);
     PlanNode right = node.getInputs().get(1);
 
     MultiStageOperator leftOperator = left.visit(this, context);
     MultiStageOperator rightOperator = right.visit(this, context);
 
-    return new HashJoinOperator(context.getOpChainExecutionContext(), 
leftOperator, rightOperator, left.getDataSchema(),
-        node);
+    return new HashJoinOperator(context, leftOperator, rightOperator, 
left.getDataSchema(), node);
   }
 
   @Override
-  public MultiStageOperator visitProject(ProjectNode node, PhysicalPlanContext 
context) {
+  public MultiStageOperator visitProject(ProjectNode node, 
OpChainExecutionContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
-    return new TransformOperator(context.getOpChainExecutionContext(), 
nextOperator, node.getDataSchema(),
-        node.getProjects(), node.getInputs().get(0).getDataSchema());
+    return new TransformOperator(context, nextOperator, node.getDataSchema(), 
node.getProjects(),
+        node.getInputs().get(0).getDataSchema());
   }
 
   @Override
-  public MultiStageOperator visitSort(SortNode node, PhysicalPlanContext 
context) {
+  public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext 
context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     boolean isInputSorted = nextOperator instanceof 
SortedMailboxReceiveOperator;
-    return new SortOperator(context.getOpChainExecutionContext(), 
nextOperator, node.getCollationKeys(),
-        node.getCollationDirections(), node.getCollationNullDirections(), 
node.getFetch(), node.getOffset(),
-        node.getDataSchema(), isInputSorted);
+    return new SortOperator(context, nextOperator, node.getCollationKeys(), 
node.getCollationDirections(),
+        node.getCollationNullDirections(), node.getFetch(), node.getOffset(), 
node.getDataSchema(), isInputSorted);
   }
 
   @Override
-  public MultiStageOperator visitTableScan(TableScanNode node, 
PhysicalPlanContext context) {
+  public MultiStageOperator visitTableScan(TableScanNode node, 
OpChainExecutionContext context) {
     throw new UnsupportedOperationException("Stage node of type TableScanNode 
is not supported!");
   }
 
   @Override
-  public MultiStageOperator visitValue(ValueNode node, PhysicalPlanContext 
context) {
-    return new LiteralValueOperator(context.getOpChainExecutionContext(), 
node.getDataSchema(), node.getLiteralRows());
+  public MultiStageOperator visitValue(ValueNode node, OpChainExecutionContext 
context) {
+    return new LiteralValueOperator(context, node.getDataSchema(), 
node.getLiteralRows());
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 69663f5d33..8972a89e18 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -35,7 +35,6 @@ import 
org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,31 +56,29 @@ public class PipelineBreakerExecutor {
    * @param scheduler scheduler service to run the pipeline breaker main 
thread.
    * @param mailboxService mailbox service to attach the {@link 
MailboxReceiveNode} against.
    * @param distributedStagePlan the distributed stage plan to run pipeline 
breaker on.
-   * @param deadlineMs execution deadline
+   * @param requestMetadata request metadata, including query options
    * @param requestId request ID
-   * @param isTraceEnabled whether to enable trace.
+   * @param deadlineMs execution deadline
    * @return pipeline breaker result;
    *   - If exception occurs, exception block will be wrapped in {@link 
TransferableBlock} and assigned to each PB node.
    *   - Normal stats will be attached to each PB node and downstream 
execution should return with stats attached.
    */
   @Nullable
   public static PipelineBreakerResult 
executePipelineBreakers(OpChainSchedulerService scheduler,
-      MailboxService mailboxService, DistributedStagePlan 
distributedStagePlan, long deadlineMs, long requestId,
-      boolean isTraceEnabled) {
+      MailboxService mailboxService, DistributedStagePlan 
distributedStagePlan, Map<String, String> requestMetadata,
+      long requestId, long deadlineMs) {
     PipelineBreakerContext pipelineBreakerContext = new 
PipelineBreakerContext();
     PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), 
pipelineBreakerContext);
     if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) {
       try {
-        PlanNode stageRoot = distributedStagePlan.getStageRoot();
         // TODO: This PlanRequestContext needs to indicate it is a pre-stage 
opChain and only listens to pre-stage
         //     OpChain receive-mail callbacks.
         // see also: MailboxIdUtils TODOs, de-couple mailbox id from query 
information
-        OpChainExecutionContext opChainContext =
-            new OpChainExecutionContext(mailboxService, requestId, 
stageRoot.getPlanFragmentId(),
-                distributedStagePlan.getServer(), deadlineMs, 
distributedStagePlan.getStageMetadata(), null,
-                isTraceEnabled);
-        PhysicalPlanContext physicalPlanContext = new 
PhysicalPlanContext(opChainContext, null);
-        return PipelineBreakerExecutor.execute(scheduler, 
pipelineBreakerContext, physicalPlanContext);
+        OpChainExecutionContext opChainExecutionContext =
+            new OpChainExecutionContext(mailboxService, requestId, 
distributedStagePlan.getStageId(),
+                distributedStagePlan.getServer(), deadlineMs, requestMetadata, 
distributedStagePlan.getStageMetadata(),
+                null);
+        return execute(scheduler, pipelineBreakerContext, 
opChainExecutionContext);
       } catch (Exception e) {
         LOGGER.error("Caught exception executing pipeline breaker for request: 
{}, stage: {}", requestId,
             distributedStagePlan.getStageId(), e);
@@ -93,36 +90,36 @@ public class PipelineBreakerExecutor {
     }
   }
 
-  private static PipelineBreakerResult execute(OpChainSchedulerService 
scheduler, PipelineBreakerContext context,
-      PhysicalPlanContext physicalPlanContext)
+  private static PipelineBreakerResult execute(OpChainSchedulerService 
scheduler,
+      PipelineBreakerContext pipelineBreakerContext, OpChainExecutionContext 
opChainExecutionContext)
       throws Exception {
     Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new 
HashMap<>();
-    for (Map.Entry<Integer, PlanNode> e : 
context.getPipelineBreakerMap().entrySet()) {
+    for (Map.Entry<Integer, PlanNode> e : 
pipelineBreakerContext.getPipelineBreakerMap().entrySet()) {
       int key = e.getKey();
       PlanNode planNode = e.getValue();
       if (!(planNode instanceof MailboxReceiveNode)) {
         throw new UnsupportedOperationException("Only MailboxReceiveNode is 
supported to run as pipeline breaker now");
       }
-      OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, 
physicalPlanContext);
-      pipelineWorkerMap.put(key, tempOpChain.getRoot());
+      OpChain opChain = PhysicalPlanVisitor.walkPlanNode(planNode, 
opChainExecutionContext);
+      pipelineWorkerMap.put(key, opChain.getRoot());
     }
-    return runMailboxReceivePipelineBreaker(scheduler, context, 
pipelineWorkerMap, physicalPlanContext);
+    return runMailboxReceivePipelineBreaker(scheduler, pipelineBreakerContext, 
pipelineWorkerMap,
+        opChainExecutionContext);
   }
 
   private static PipelineBreakerResult 
runMailboxReceivePipelineBreaker(OpChainSchedulerService scheduler,
-      PipelineBreakerContext context, Map<Integer, 
Operator<TransferableBlock>> pipelineWorkerMap,
-      PhysicalPlanContext physicalPlanContext)
+      PipelineBreakerContext pipelineBreakerContext, Map<Integer, 
Operator<TransferableBlock>> pipelineWorkerMap,
+      OpChainExecutionContext opChainExecutionContext)
       throws Exception {
     PipelineBreakerOperator pipelineBreakerOperator =
-        new 
PipelineBreakerOperator(physicalPlanContext.getOpChainExecutionContext(), 
pipelineWorkerMap);
+        new PipelineBreakerOperator(opChainExecutionContext, 
pipelineWorkerMap);
     CountDownLatch latch = new CountDownLatch(1);
     OpChain pipelineBreakerOpChain =
-        new OpChain(physicalPlanContext.getOpChainExecutionContext(), 
pipelineBreakerOperator,
-            physicalPlanContext.getReceivingMailboxIds(), (id) -> 
latch.countDown());
+        new OpChain(opChainExecutionContext, pipelineBreakerOperator, (id) -> 
latch.countDown());
     scheduler.register(pipelineBreakerOpChain);
-    long timeoutMs = physicalPlanContext.getDeadlineMs() - 
System.currentTimeMillis();
+    long timeoutMs = opChainExecutionContext.getDeadlineMs() - 
System.currentTimeMillis();
     if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
-      return new PipelineBreakerResult(context.getNodeIdMap(), 
pipelineBreakerOperator.getResultMap(),
+      return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), 
pipelineBreakerOperator.getResultMap(),
           pipelineBreakerOperator.getErrorBlock(), 
pipelineBreakerOpChain.getStats());
     } else {
       throw new TimeoutException(
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index fb1c3af0c1..bc78678fa3 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -20,7 +20,7 @@ package org.apache.pinot.query.runtime.plan.server;
 
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.spi.config.table.TableType;
 
 
@@ -29,20 +29,21 @@ import org.apache.pinot.spi.config.table.TableType;
  * {@link PinotQuery} to execute on server.
  */
 public class ServerPlanRequestContext {
-  private final PhysicalPlanContext _planContext;
+  private final OpChainExecutionContext _executionContext;
   private final TableType _tableType;
 
   private PinotQuery _pinotQuery;
   private InstanceRequest _instanceRequest;
 
-  public ServerPlanRequestContext(PhysicalPlanContext planContext, PinotQuery 
pinotQuery, TableType tableType) {
-    _planContext = planContext;
+  public ServerPlanRequestContext(OpChainExecutionContext executionContext, 
PinotQuery pinotQuery,
+      TableType tableType) {
+    _executionContext = executionContext;
     _pinotQuery = pinotQuery;
     _tableType = tableType;
   }
 
-  public PhysicalPlanContext getPlanContext() {
-    return _planContext;
+  public OpChainExecutionContext getExecutionContext() {
+    return _executionContext;
   }
 
   public TableType getTableType() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 7f9e756acc..466b29af14 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -43,7 +43,7 @@ import 
org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -61,6 +61,9 @@ import org.slf4j.LoggerFactory;
 
 
 public class ServerPlanRequestUtils {
+  private ServerPlanRequestUtils() {
+  }
+
   private static final int DEFAULT_LEAF_NODE_LIMIT = Integer.MAX_VALUE;
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerPlanRequestUtils.class);
   private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
@@ -70,22 +73,16 @@ public class ServerPlanRequestUtils {
       new 
ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
   private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
 
-  private ServerPlanRequestUtils() {
-    // do not instantiate.
-  }
-
   /**
    * Entry point to construct a {@link ServerPlanRequestContext} for executing 
leaf-stage runner.
    *
-   * @param planContext physical plan context of the stage.
+   * @param executionContext execution context of the stage.
    * @param distributedStagePlan distributed stage plan of the stage.
-   * @param requestMetadataMap metadata map
    * @param helixPropertyStore helix property store used to fetch table config 
and schema for leaf-stage execution.
    * @return a list of server plan request context to be run
    */
-  public static List<ServerPlanRequestContext> 
constructServerQueryRequests(PhysicalPlanContext planContext,
-      DistributedStagePlan distributedStagePlan, Map<String, String> 
requestMetadataMap,
-      ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
+  public static List<ServerPlanRequestContext> 
constructServerQueryRequests(OpChainExecutionContext executionContext,
+      DistributedStagePlan distributedStagePlan, 
ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
     StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
     WorkerMetadata workerMetadata = 
distributedStagePlan.getCurrentWorkerMetadata();
     String rawTableName = StageMetadata.getTableName(stageMetadata);
@@ -101,15 +98,15 @@ public class ServerPlanRequestUtils {
             
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.build(planContext, 
distributedStagePlan, requestMetadataMap, tableConfig,
-            schema, StageMetadata.getTimeBoundary(stageMetadata), 
TableType.OFFLINE, tableEntry.getValue()));
+        requests.add(ServerPlanRequestUtils.build(executionContext, 
distributedStagePlan, tableConfig, schema,
+            StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, 
tableEntry.getValue()));
       } else if (TableType.REALTIME.name().equals(tableType)) {
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(helixPropertyStore,
             
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.build(planContext, 
distributedStagePlan, requestMetadataMap, tableConfig,
-            schema, StageMetadata.getTimeBoundary(stageMetadata), 
TableType.REALTIME, tableEntry.getValue()));
+        requests.add(ServerPlanRequestUtils.build(executionContext, 
distributedStagePlan, tableConfig, schema,
+            StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, 
tableEntry.getValue()));
       } else {
         throw new IllegalArgumentException("Unsupported table type key: " + 
tableType);
       }
@@ -117,18 +114,15 @@ public class ServerPlanRequestUtils {
     return requests;
   }
 
-  private static ServerPlanRequestContext build(PhysicalPlanContext 
planContext, DistributedStagePlan stagePlan,
-      Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema 
schema, TimeBoundaryInfo timeBoundaryInfo,
+  private static ServerPlanRequestContext build(OpChainExecutionContext 
executionContext,
+      DistributedStagePlan stagePlan, TableConfig tableConfig, Schema schema, 
TimeBoundaryInfo timeBoundaryInfo,
       TableType tableType, List<String> segmentList) {
     // Before-visit: construct the ServerPlanRequestContext baseline
     // Making a unique requestId for leaf stages otherwise it causes problem 
on stats/metrics/tracing.
-    long requestId =
-        
(Long.parseLong(requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID))
 << 16) + (
-            (long) stagePlan.getStageId() << 8) + (tableType == 
TableType.REALTIME ? 1 : 0);
-    long timeoutMs = 
Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
-    boolean traceEnabled = 
Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE));
+    long requestId = (executionContext.getRequestId() << 16) + ((long) 
stagePlan.getStageId() << 8) + (
+        tableType == TableType.REALTIME ? 1 : 0);
     PinotQuery pinotQuery = new PinotQuery();
-    Integer leafNodeLimit = 
QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap);
+    Integer leafNodeLimit = 
QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getRequestMetadata());
     if (leafNodeLimit != null) {
       pinotQuery.setLimit(leafNodeLimit);
     } else {
@@ -136,7 +130,7 @@ public class ServerPlanRequestUtils {
     }
     LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
     pinotQuery.setExplain(false);
-    ServerPlanRequestContext serverContext = new 
ServerPlanRequestContext(planContext, pinotQuery, tableType);
+    ServerPlanRequestContext serverContext = new 
ServerPlanRequestContext(executionContext, pinotQuery, tableType);
 
     // visit the plan and create query physical plan.
     ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), 
serverContext);
@@ -152,7 +146,7 @@ public class ServerPlanRequestUtils {
     QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
 
     // 2. set pinot query options according to requestMetadataMap
-    updateQueryOptions(pinotQuery, requestMetadataMap, timeoutMs, 
traceEnabled);
+    updateQueryOptions(pinotQuery, executionContext);
 
     // 3. wrapped around in broker request
     BrokerRequest brokerRequest = new BrokerRequest();
@@ -168,7 +162,7 @@ public class ServerPlanRequestUtils {
     InstanceRequest instanceRequest = new InstanceRequest();
     instanceRequest.setRequestId(requestId);
     instanceRequest.setBrokerId("unknown");
-    
instanceRequest.setEnableTrace(Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE)));
+    instanceRequest.setEnableTrace(executionContext.isTraceEnabled());
     instanceRequest.setSearchSegments(segmentList);
     instanceRequest.setQuery(brokerRequest);
 
@@ -179,16 +173,10 @@ public class ServerPlanRequestUtils {
   /**
    * Helper method to update query options.
    */
-  private static void updateQueryOptions(PinotQuery pinotQuery, Map<String, 
String> requestMetadataMap, long timeoutMs,
-      boolean traceEnabled) {
-    Map<String, String> queryOptions = new HashMap<>();
-    // put default timeout and trace options
-    queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, 
String.valueOf(timeoutMs));
-    if (traceEnabled) {
-      queryOptions.put(CommonConstants.Broker.Request.TRACE, "true");
-    }
-    // overwrite with requestMetadataMap to carry query options from request:
-    queryOptions.putAll(requestMetadataMap);
+  private static void updateQueryOptions(PinotQuery pinotQuery, 
OpChainExecutionContext executionContext) {
+    Map<String, String> queryOptions = new 
HashMap<>(executionContext.getRequestMetadata());
+    queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
+        Long.toString(executionContext.getDeadlineMs() - 
System.currentTimeMillis()));
     pinotQuery.setQueryOptions(queryOptions);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index 5e3873fbcd..be7db3ea63 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -110,7 +110,7 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
       staticSide = node.getInputs().get(1);
     }
     staticSide.visit(this, context);
-    PipelineBreakerResult pipelineBreakerResult = 
context.getPlanContext().getPipelineBreakerResult();
+    PipelineBreakerResult pipelineBreakerResult = 
context.getExecutionContext().getPipelineBreakerResult();
     int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide);
     List<TransferableBlock> transferableBlocks = 
pipelineBreakerResult.getResultMap().getOrDefault(
         resultMapId, Collections.emptyList());
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 2561779c2b..bf1993af3d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -87,15 +87,14 @@ public class QueryDispatcher {
   }
 
   public ResultTable submitAndReduce(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
-      Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> 
executionStatsAggregator,
-      boolean traceEnabled)
+      Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> 
executionStatsAggregator)
       throws Exception {
     long requestId = context.getRequestId();
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions);
       long reduceStartTimeNs = System.nanoTime();
       ResultTable resultTable =
-          runReducer(requestId, dispatchableSubPlan, timeoutMs, 
executionStatsAggregator, traceEnabled,
+          runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions, 
executionStatsAggregator,
               _mailboxService);
       context.setReduceTimeNanos(System.nanoTime() - reduceStartTimeNs);
       return resultTable;
@@ -184,7 +183,8 @@ public class QueryDispatcher {
 
   @VisibleForTesting
   public static ResultTable runReducer(long requestId, DispatchableSubPlan 
dispatchableSubPlan, long timeoutMs,
-      Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean 
traceEnabled, MailboxService mailboxService) {
+      Map<String, String> queryOptions, @Nullable Map<Integer, 
ExecutionStatsAggregator> statsAggregatorMap,
+      MailboxService mailboxService) {
     // NOTE: Reduce stage is always stage 0
     DispatchablePlanFragment dispatchablePlanFragment = 
dispatchableSubPlan.getQueryStageList().get(0);
     PlanFragment planFragment = dispatchablePlanFragment.getPlanFragment();
@@ -199,8 +199,8 @@ public class QueryDispatcher {
         
.addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build();
     OpChainExecutionContext opChainExecutionContext =
         new OpChainExecutionContext(mailboxService, requestId, 
planFragment.getFragmentId(),
-            workerMetadataList.get(0).getVirtualServerAddress(), 
System.currentTimeMillis() + timeoutMs, stageMetadata,
-            null, traceEnabled);
+            workerMetadataList.get(0).getVirtualServerAddress(), 
System.currentTimeMillis() + timeoutMs, queryOptions,
+            stageMetadata, null);
     MailboxReceiveOperator receiveOperator =
         new MailboxReceiveOperator(opChainExecutionContext, 
receiveNode.getDistributionType(),
             receiveNode.getSenderStageId());
@@ -210,9 +210,9 @@ public class QueryDispatcher {
     return resultTable;
   }
 
-  private static void collectStats(DispatchableSubPlan dispatchableSubPlan, 
@Nullable OpChainStats opChainStats,
+  private static void collectStats(DispatchableSubPlan dispatchableSubPlan, 
OpChainStats opChainStats,
       @Nullable Map<Integer, ExecutionStatsAggregator> 
executionStatsAggregatorMap) {
-    if (executionStatsAggregatorMap != null && opChainStats != null) {
+    if (executionStatsAggregatorMap != null) {
       LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", 
opChainStats.getExecutionTime());
       for (Map.Entry<String, OperatorStats> entry : 
opChainStats.getOperatorStatsMap().entrySet()) {
         OperatorStats operatorStats = entry.getValue();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 893e0afc5e..3b1d18655d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -98,10 +98,10 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryResponse> responseObserver) {
     // Deserialize the request
     List<DistributedStagePlan> distributedStagePlans;
-    Map<String, String> requestMetadataMap;
-    requestMetadataMap = request.getMetadataMap();
-    long requestId = 
Long.parseLong(requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
-    long timeoutMs = 
Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+    Map<String, String> requestMetadata;
+    requestMetadata = request.getMetadataMap();
+    long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
+    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
     // 1. Deserialized request
     try {
@@ -114,7 +114,7 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     // 2. Submit distributed stage plans
     SubmissionService submissionService = new 
SubmissionService(_querySubmissionExecutorService);
     distributedStagePlans.forEach(distributedStagePlan -> 
submissionService.submit(() -> {
-      _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+      _queryRunner.processQuery(distributedStagePlan, requestMetadata);
     }));
     // 3. await response successful or any failure which cancels all other 
tasks.
     try {
@@ -123,8 +123,7 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
       LOGGER.error("error occurred during stage submission for {}:\n{}", 
requestId, t);
       responseObserver.onNext(Worker.QueryResponse.newBuilder()
           
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
-              QueryException.getTruncatedStackTrace(t))
-          .build());
+              QueryException.getTruncatedStackTrace(t)).build());
       responseObserver.onCompleted();
       return;
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 7b27883210..afe5aa34ab 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -208,7 +209,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
       processDistributedStagePlans(dispatchableSubPlan, stageId, 
requestMetadataMap);
     }
     try {
-      QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, 
null, false, _mailboxService);
+      QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, 
Collections.emptyMap(), null,
+          _mailboxService);
       Assert.fail("Should have thrown exception!");
     } catch (RuntimeException e) {
       // NOTE: The actual message is (usually) something like:
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index f784f4b586..5112cf4caf 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -137,8 +137,8 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
       }
     }
     ResultTable resultTable =
-        QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, 
executionStatsAggregatorMap, true,
-            _mailboxService);
+        QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, 
Collections.emptyMap(),
+            executionStatsAggregatorMap, _mailboxService);
     return resultTable.getRows();
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 38ab949875..e79f46e671 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.query.runtime.executor;
 
-import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -68,8 +68,9 @@ public class OpChainSchedulerServiceTest {
 
   private OpChain getChain(MultiStageOperator operator) {
     VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 
1);
-    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 
1, address, 0, null, null, true);
-    return new OpChain(context, operator, ImmutableList.of());
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(null, 123L, 1, address, Long.MAX_VALUE, 
Collections.emptyMap(), null, null);
+    return new OpChain(context, operator);
   }
 
   @Test
@@ -131,8 +132,8 @@ public class OpChainSchedulerServiceTest {
     OpChainSchedulerService schedulerService = new 
OpChainSchedulerService(_executor);
 
     CountDownLatch latch = new CountDownLatch(1);
-    Mockito.when(_operatorA.nextBlock()).thenReturn(
-        TransferableBlockUtils.getErrorTransferableBlock(new 
RuntimeException("foo")));
+    Mockito.when(_operatorA.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
RuntimeException("foo")));
     Mockito.doAnswer(inv -> {
       latch.countDown();
       return null;
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
index 83dea7c82d..4913527d29 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
@@ -25,7 +25,6 @@ import 
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.mockito.Mock;
 import org.mockito.Mockito;
@@ -40,9 +39,6 @@ public class LiteralValueOperatorTest {
 
   private AutoCloseable _mocks;
 
-  @Mock
-  private PhysicalPlanContext _context;
-
   @Mock
   private VirtualServerAddress _serverAddress;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 76be227fcd..e9c8661ff9 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -105,12 +105,12 @@ public class MailboxReceiveOperatorTest {
   @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, 
Long.MAX_VALUE, null);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, 
Long.MAX_VALUE, _stageMetadata1);
     //noinspection resource
     new MailboxReceiveOperator(context, 
RelDistribution.Type.RANGE_DISTRIBUTED, 1);
   }
 
-  @Test(enabled = true)
+  @Test
   public void shouldTimeout()
       throws InterruptedException {
     
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 85893cb6b5..c01447d7cd 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -189,8 +189,8 @@ public class MailboxSendOperatorTest {
     StageMetadata stageMetadata = new 
StageMetadata.Builder().setWorkerMetadataList(
         Collections.singletonList(new 
WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, 
_server, Long.MAX_VALUE, stageMetadata, null,
-            false);
+        new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, 
_server, Long.MAX_VALUE,
+            Collections.emptyMap(), stageMetadata, null);
     return new MailboxSendOperator(context, _sourceOperator, _exchange, null, 
null, false);
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 3bd295fe4c..cab25f9127 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -50,6 +50,7 @@ import 
org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
@@ -132,7 +133,7 @@ public class OpChainTest {
       Thread.sleep(100);
       return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     });
-    OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(), 
_sourceOperator, new ArrayList<>());
+    OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(), 
_sourceOperator);
     opChain.getStats().executing();
     opChain.getRoot().nextBlock();
     opChain.getStats().queued();
@@ -142,7 +143,7 @@ public class OpChainTest {
       Thread.sleep(20);
       return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     });
-    opChain = new OpChain(OperatorTestUtil.getDefaultContext(), 
_sourceOperator, new ArrayList<>());
+    opChain = new OpChain(OperatorTestUtil.getDefaultContext(), 
_sourceOperator);
     opChain.getStats().executing();
     opChain.getRoot().nextBlock();
     opChain.getStats().queued();
@@ -155,7 +156,7 @@ public class OpChainTest {
     OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
     DummyMultiStageOperator dummyMultiStageOperator = new 
DummyMultiStageOperator(context);
 
-    OpChain opChain = new OpChain(context, dummyMultiStageOperator, new 
ArrayList<>());
+    OpChain opChain = new OpChain(context, dummyMultiStageOperator);
     opChain.getStats().executing();
     opChain.getRoot().nextBlock();
     opChain.getStats().queued();
@@ -168,8 +169,8 @@ public class OpChainTest {
         
opChain.getStats().getOperatorStatsMap().get(dummyMultiStageOperator.getOperatorId()).getExecutionStats();
 
     long time = 
Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName()));
-    assertTrue(time >= 1000 && time <= 2000, "Expected " + 
DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS
-        + " to be in [1000, 2000] but found " + time);
+    assertTrue(time >= 1000 && time <= 2000,
+        "Expected " + DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS + " to 
be in [1000, 2000] but found " + time);
   }
 
   @Test
@@ -177,7 +178,7 @@ public class OpChainTest {
     OpChainExecutionContext context = 
OperatorTestUtil.getDefaultContextWithTracingDisabled();
     DummyMultiStageOperator dummyMultiStageOperator = new 
DummyMultiStageOperator(context);
 
-    OpChain opChain = new OpChain(context, dummyMultiStageOperator, new 
ArrayList<>());
+    OpChain opChain = new OpChain(context, dummyMultiStageOperator);
     opChain.getStats().executing();
     opChain.getRoot().nextBlock();
     opChain.getStats().queued();
@@ -192,13 +193,14 @@ public class OpChainTest {
 
     int receivedStageId = 2;
     int senderStageId = 1;
-    OpChainExecutionContext context = new 
OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
-        System.currentTimeMillis() + 1000, _receivingStageMetadata, null, 
true);
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(_mailboxService1, 1, senderStageId, 
_serverAddress, Long.MAX_VALUE,
+            Collections.singletonMap(CommonConstants.Broker.Request.TRACE, 
"true"), _receivingStageMetadata, null);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, 
dummyOperatorWaitTime);
 
-    OpChain opChain = new OpChain(context, operators.peek(), new 
ArrayList<>());
+    OpChain opChain = new OpChain(context, operators.peek());
     opChain.getStats().executing();
     while (!opChain.getRoot().nextBlock().isEndOfStreamBlock()) {
       // Drain the opchain
@@ -206,8 +208,8 @@ public class OpChainTest {
     opChain.getStats().queued();
 
     OpChainExecutionContext secondStageContext =
-        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, 
_serverAddress,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, null, 
true);
+        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, 
_serverAddress, Long.MAX_VALUE,
+            Collections.singletonMap(CommonConstants.Broker.Request.TRACE, 
"true"), _receivingStageMetadata, null);
 
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, 
RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -231,20 +233,21 @@ public class OpChainTest {
 
     int receivedStageId = 2;
     int senderStageId = 1;
-    OpChainExecutionContext context = new 
OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
-        System.currentTimeMillis() + 1000, _receivingStageMetadata, null, 
false);
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(_mailboxService1, 1, senderStageId, 
_serverAddress, Long.MAX_VALUE,
+            Collections.emptyMap(), _receivingStageMetadata, null);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, 
dummyOperatorWaitTime);
 
-    OpChain opChain = new OpChain(context, operators.peek(), new 
ArrayList<>());
+    OpChain opChain = new OpChain(context, operators.peek());
     opChain.getStats().executing();
     opChain.getRoot().nextBlock();
     opChain.getStats().queued();
 
     OpChainExecutionContext secondStageContext =
-        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, 
_serverAddress,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, null, 
false);
+        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, 
_serverAddress, Long.MAX_VALUE,
+            Collections.emptyMap(), _receivingStageMetadata, null);
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, 
RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 55878658f7..5f139e5545 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -31,6 +32,7 @@ import 
org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
 
 
 public class OperatorTestUtil {
@@ -75,22 +77,19 @@ public class OperatorTestUtil {
 
   public static OpChainExecutionContext getOpChainContext(MailboxService 
mailboxService,
       VirtualServerAddress receiverAddress, long deadlineMs, StageMetadata 
stageMetadata) {
-    return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, 
deadlineMs, stageMetadata, null, false);
+    return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, 
deadlineMs, Collections.emptyMap(),
+        stageMetadata, null);
   }
 
   public static OpChainExecutionContext getDefaultContext() {
     VirtualServerAddress virtualServerAddress = new 
VirtualServerAddress("mock", 80, 0);
-    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, 
Long.MAX_VALUE, null, null, true);
+    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, 
Long.MAX_VALUE,
+        Collections.singletonMap(CommonConstants.Broker.Request.TRACE, 
"true"), null, null);
   }
 
   public static OpChainExecutionContext getDefaultContextWithTracingDisabled() 
{
     VirtualServerAddress virtualServerAddress = new 
VirtualServerAddress("mock", 80, 0);
-    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, 
Long.MAX_VALUE, null, null, false);
-  }
-
-  public static OpChainExecutionContext getContext(long requestId, int stageId,
-      VirtualServerAddress virtualServerAddress) {
-    return new OpChainExecutionContext(null, requestId, stageId, 
virtualServerAddress, Long.MAX_VALUE, null, null,
-        true);
+    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, 
Long.MAX_VALUE, Collections.emptyMap(), null,
+        null);
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 38940e0892..0c6e60561d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -115,7 +115,7 @@ public class SortedMailboxReceiveOperatorTest {
   @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, 
Long.MAX_VALUE, null);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, 
Long.MAX_VALUE, _stageMetadata1);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, 
RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
         COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -125,8 +125,7 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldThrowOnEmptyCollationKey() {
     
when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1);
     OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, 
System.currentTimeMillis() + 10L,
-            _stageMetadata1);
+        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, 
Long.MAX_VALUE, _stageMetadata1);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 
DATA_SCHEMA, Collections.emptyList(),
         Collections.emptyList(), Collections.emptyList(), false, 1);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index b5ba82e48d..e7fa0e7db2 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
@@ -131,7 +132,7 @@ public class PipelineBreakerExecutorTest {
 
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan,
-            System.currentTimeMillis() + 10_000L, 0, false);
+            Collections.emptyMap(), 0, Long.MAX_VALUE);
 
     // then
     // should have single PB result, receive 2 data blocks, EOS block 
shouldn't be included
@@ -172,7 +173,7 @@ public class PipelineBreakerExecutorTest {
 
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan,
-            System.currentTimeMillis() + 10_000L, 0, false);
+            Collections.emptyMap(), 0, Long.MAX_VALUE);
 
     // then
     // should have two PB result, receive 2 data blocks, one each, EOS block 
shouldn't be included
@@ -200,7 +201,7 @@ public class PipelineBreakerExecutorTest {
     // when
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan,
-            System.currentTimeMillis() + 10_000L, 0, false);
+            Collections.emptyMap(), 0, Long.MAX_VALUE);
 
     // then
     // should return empty block list
@@ -215,23 +216,23 @@ public class PipelineBreakerExecutorTest {
 
   @Test
   public void shouldReturnErrorBlocksFailureWhenPBTimeout() {
-    MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+    MailboxReceiveNode mailboxReceiveNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 1, 
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
     DistributedStagePlan distributedStagePlan =
-        new DistributedStagePlan(0, RECEIVER_ADDRESS, 
incorrectlyConfiguredMailboxNode, _stageMetadata1);
+        new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode, 
_stageMetadata1);
 
     // when
     
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
-    Object[] row1 = new Object[]{1, 1};
-    Object[] row2 = new Object[]{2, 3};
-    when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, 
row1),
-        OperatorTestUtil.block(DATA_SCHEMA, row2),
-        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    CountDownLatch latch = new CountDownLatch(1);
+    when(_mailbox1.poll()).thenAnswer(invocation -> {
+      latch.await();
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    });
 
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan,
-            System.currentTimeMillis() - 10_000L, 0, false);
+            Collections.emptyMap(), 0, System.currentTimeMillis() + 100);
 
     // then
     // should contain only failure error blocks
@@ -239,6 +240,8 @@ public class PipelineBreakerExecutorTest {
     TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
     Assert.assertNotNull(errorBlock);
     Assert.assertTrue(errorBlock.isErrorBlock());
+
+    latch.countDown();
   }
 
   @Test
@@ -268,7 +271,7 @@ public class PipelineBreakerExecutorTest {
 
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan,
-            System.currentTimeMillis() + 10_000L, 0, false);
+            Collections.emptyMap(), 0, Long.MAX_VALUE);
 
     // then
     // should pass when one PB returns result, the other returns empty.
@@ -307,7 +310,7 @@ public class PipelineBreakerExecutorTest {
 
     PipelineBreakerResult pipelineBreakerResult =
         PipelineBreakerExecutor.executePipelineBreakers(_scheduler, 
_mailboxService, distributedStagePlan,
-            System.currentTimeMillis() + 10_000L, 0, false);
+            Collections.emptyMap(), 0, Long.MAX_VALUE);
 
     // then
     // should fail even if one of the 2 PB doesn't contain error block from 
sender.
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 417f5b5e53..08505e6e33 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -125,7 +125,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     context.setRequestId(requestId);
     DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
     try {
-      _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, 
Collections.emptyMap(), null, false);
+      _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, 
Collections.emptyMap(), null);
       Assert.fail("Method call above should have failed");
     } catch (Exception e) {
       Assert.assertTrue(e.getMessage().contains("Error dispatching query"));
@@ -149,7 +149,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
     try {
       // will throw b/c mailboxService is mocked
-      _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, 
Collections.emptyMap(), null, false);
+      _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, 
Collections.emptyMap(), null);
       Assert.fail("Method call above should have failed");
     } catch (NullPointerException e) {
       // Expected


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to