This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5416652ff5 [multistage] [debuggability] OpChain and operator stats 
(#10094)
5416652ff5 is described below

commit 5416652ff59b150d455c1f4764d2e0bc561d8cf3
Author: Yao Liu <y...@startree.ai>
AuthorDate: Thu Jan 19 08:54:43 2023 -0800

    [multistage] [debuggability] OpChain and operator stats (#10094)
---
 .../apache/pinot/query/runtime/QueryRunner.java    |  62 +++++-----
 .../runtime/executor/OpChainSchedulerService.java  |  10 +-
 .../query/runtime/operator/AggregateOperator.java  |  86 +++++++------
 .../query/runtime/operator/FilterOperator.java     |  22 +++-
 .../query/runtime/operator/HashJoinOperator.java   |  30 ++++-
 .../LeafStageTransferableBlockOperator.java        |  50 +++++---
 .../runtime/operator/LiteralValueOperator.java     |  27 +++-
 .../runtime/operator/MailboxReceiveOperator.java   |  99 ++++++++-------
 .../runtime/operator/MailboxSendOperator.java      |  21 +++-
 .../pinot/query/runtime/operator/OpChain.java      |   1 +
 .../pinot/query/runtime/operator/OpChainStats.java |  20 +--
 .../query/runtime/operator/OperatorStats.java      |  78 ++++++++++++
 .../pinot/query/runtime/operator/SortOperator.java |  26 +++-
 .../query/runtime/operator/TransformOperator.java  |  23 +++-
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  15 ++-
 .../pinot/query/service/QueryDispatcher.java       |  12 +-
 .../runtime/operator/AggregateOperatorTest.java    |  41 +++----
 .../query/runtime/operator/FilterOperatorTest.java |  28 ++---
 .../runtime/operator/HashJoinOperatorTest.java     |  35 +++---
 .../LeafStageTransferableBlockOperatorTest.java    | 136 +++++++++++----------
 .../runtime/operator/LiteralValueOperatorTest.java |  25 +++-
 .../runtime/operator/MailboxSendOperatorTest.java  |  10 +-
 .../query/runtime/operator/SortOperatorTest.java   |  30 ++---
 .../runtime/operator/TransformOperatorTest.java    |  20 +--
 24 files changed, 585 insertions(+), 322 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 41571fcd76..ad0d138bb4 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -88,24 +88,20 @@ public class QueryRunner {
    * Initializes the query executor.
    * <p>Should be called only once and before calling any other method.
    */
-  public void init(PinotConfiguration config, InstanceDataManager 
instanceDataManager,
-      HelixManager helixManager, ServerMetrics serverMetrics) {
+  public void init(PinotConfiguration config, InstanceDataManager 
instanceDataManager, HelixManager helixManager,
+      ServerMetrics serverMetrics) {
     String instanceName = 
config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
     _hostname = 
instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? 
instanceName.substring(
         CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
     _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
     _helixManager = helixManager;
     try {
-      long releaseMs = config.getProperty(
-          QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
+      long releaseMs = 
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
           QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
 
-      _scheduler = new OpChainSchedulerService(
-          new RoundRobinScheduler(releaseMs),
-          Executors.newFixedThreadPool(
-              ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
-              new NamedThreadFactory("query_worker_on_" + _port + "_port")),
-          releaseMs);
+      _scheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs),
+          
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+              new NamedThreadFactory("query_worker_on_" + _port + "_port")), 
releaseMs);
       _mailboxService = MultiplexingMailboxService.newInstance(_hostname, 
_port, config, _scheduler::onDataAvailable);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), 
instanceDataManager, serverMetrics);
@@ -130,12 +126,14 @@ public class QueryRunner {
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, 
Map<String, String> requestMetadataMap) {
+    long requestId = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     if (isLeafStage(distributedStagePlan)) {
       // TODO: make server query request return via mailbox, this is a hack to 
gather the non-streaming data table
       // and package it here for return. But we should really use a 
MailboxSendOperator directly put into the
       // server executor.
-      List<ServerPlanRequestContext> serverQueryRequests = 
constructServerQueryRequests(distributedStagePlan,
-          requestMetadataMap, _helixPropertyStore, _mailboxService);
+      long leafStageStartMillis = System.currentTimeMillis();
+      List<ServerPlanRequestContext> serverQueryRequests =
+          constructServerQueryRequests(distributedStagePlan, 
requestMetadataMap, _helixPropertyStore, _mailboxService);
 
       // send the data table via mailbox in one-off fashion (e.g. no 
block-level split, one data table/partition key)
       List<InstanceResponseBlock> serverQueryResults = new 
ArrayList<>(serverQueryRequests.size());
@@ -144,25 +142,27 @@ public class QueryRunner {
             new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
System.currentTimeMillis());
         serverQueryResults.add(processServerQuery(request, 
_scheduler.getWorkerPool()));
       }
-
+      LOGGER.debug(
+          "RequestId:" + requestId + " StageId:" + 
distributedStagePlan.getStageId() + " Leaf stage v1 processing time:"
+              + (System.currentTimeMillis() - leafStageStartMillis) + " ms");
       MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
       StageMetadata receivingStageMetadata = 
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
-      MailboxSendOperator mailboxSendOperator =
-          new MailboxSendOperator(_mailboxService,
-              new LeafStageTransferableBlockOperator(serverQueryResults, 
sendNode.getDataSchema()),
-              receivingStageMetadata.getServerInstances(), 
sendNode.getExchangeType(),
-              sendNode.getPartitionKeySelector(), _hostname, _port, 
serverQueryRequests.get(0).getRequestId(),
-              sendNode.getStageId());
+      MailboxSendOperator mailboxSendOperator = new 
MailboxSendOperator(_mailboxService,
+          new LeafStageTransferableBlockOperator(serverQueryResults, 
sendNode.getDataSchema(), requestId,
+              sendNode.getStageId()), 
receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(),
+          sendNode.getPartitionKeySelector(), _hostname, _port, 
serverQueryRequests.get(0).getRequestId(),
+          sendNode.getStageId());
       int blockCounter = 0;
       while 
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
         LOGGER.debug("Acquired transferable block: {}", blockCounter++);
       }
+      mailboxSendOperator.toExplainString();
     } else {
-      long requestId = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
       long timeoutMs = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
       StageNode stageRoot = distributedStagePlan.getStageRoot();
-      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, new 
PlanRequestContext(_mailboxService, requestId,
-          stageRoot.getStageId(), timeoutMs, _hostname, _port, 
distributedStagePlan.getMetadataMap()));
+      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+          new PlanRequestContext(_mailboxService, requestId, 
stageRoot.getStageId(), timeoutMs, _hostname, _port,
+              distributedStagePlan.getMetadataMap()));
       _scheduler.register(rootOperator);
     }
   }
@@ -174,8 +174,8 @@ public class QueryRunner {
     Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
         "Server request for V2 engine should only have 1 scan table per 
request.");
     String rawTableName = stageMetadata.getScannedTables().get(0);
-    Map<String, List<String>> tableToSegmentListMap = 
stageMetadata.getServerInstanceToSegmentsMap()
-        .get(distributedStagePlan.getServerInstance());
+    Map<String, List<String>> tableToSegmentListMap =
+        
stageMetadata.getServerInstanceToSegmentsMap().get(distributedStagePlan.getServerInstance());
     List<ServerPlanRequestContext> requests = new ArrayList<>();
     for (Map.Entry<String, List<String>> tableEntry : 
tableToSegmentListMap.entrySet()) {
       String tableType = tableEntry.getKey();
@@ -187,15 +187,17 @@ public class QueryRunner {
             
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        requests.add(ServerRequestPlanVisitor.build(mailboxService, 
distributedStagePlan, requestMetadataMap,
-            tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.OFFLINE, tableEntry.getValue()));
+        requests.add(
+            ServerRequestPlanVisitor.build(mailboxService, 
distributedStagePlan, requestMetadataMap, tableConfig,
+                schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.OFFLINE, tableEntry.getValue()));
       } else if (TableType.REALTIME.name().equals(tableType)) {
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(helixPropertyStore,
             
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        requests.add(ServerRequestPlanVisitor.build(mailboxService, 
distributedStagePlan, requestMetadataMap,
-            tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.REALTIME, tableEntry.getValue()));
+        requests.add(
+            ServerRequestPlanVisitor.build(mailboxService, 
distributedStagePlan, requestMetadataMap, tableConfig,
+                schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.REALTIME, tableEntry.getValue()));
       } else {
         throw new IllegalArgumentException("Unsupported table type key: " + 
tableType);
       }
@@ -209,8 +211,8 @@ public class QueryRunner {
       return _serverExecutor.execute(serverQueryRequest, executorService);
     } catch (Exception e) {
       InstanceResponseBlock errorResponse = new InstanceResponseBlock();
-      
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
-          e.getMessage() + QueryException.getTruncatedStackTrace(e));
+      errorResponse.getExceptions()
+          .put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() + 
QueryException.getTruncatedStackTrace(e));
       return errorResponse;
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 21a944d4f7..6c463eeeed 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -114,15 +114,18 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
                   register(operatorChain, false);
                 } else {
                   if (result.isErrorBlock()) {
+                    operatorChain.getRoot().toExplainString();
                     LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
                         result.getDataBlock().getExceptions());
                   } else {
+                    operatorChain.getRoot().toExplainString();
                     LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
                   }
                   operatorChain.close();
                 }
               } catch (Exception e) {
                 operatorChain.close();
+                operatorChain.getRoot().toExplainString();
                 LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
               }
             }
@@ -154,11 +157,6 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
     LOGGER.debug("({}): Scheduler is now handling operator chain listening to 
mailboxes {}. "
             + "There are a total of {} chains awaiting execution.", 
operatorChain, operatorChain.getReceivingMailbox(),
         _scheduler.size());
-
-    // we want to track the time that it takes from registering
-    // an operator chain to when it completes, so make sure to
-    // start the timer here
-    operatorChain.getStats().startExecutionTimer();
   }
 
   public final void register(OpChain operatorChain, boolean isNew) {
@@ -167,8 +165,8 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
       LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}", 
operatorChain, isNew, _scheduler.size());
 
       _scheduler.register(operatorChain, isNew);
-      operatorChain.getStats().queued();
     } finally {
+      operatorChain.getStats().queued();
       _monitor.leave();
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 3182dba91f..199d8c8b96 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -37,6 +37,8 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -55,8 +57,10 @@ import org.apache.pinot.spi.data.FieldSpec;
  */
 public class AggregateOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AggregateOperator.class);
 
   private final MultiStageOperator _inputOperator;
+
   // TODO: Deal with the case where _aggCalls is empty but we have groupSet 
setup, which means this is a Distinct call.
   private final List<RexExpression.FunctionCall> _aggCalls;
   private final List<RexExpression> _groupSet;
@@ -69,27 +73,29 @@ public class AggregateOperator extends MultiStageOperator {
   private boolean _readyToConstruct;
   private boolean _hasReturnedAggregateBlock;
 
+  // TODO: Move to OperatorContext class.
+  private OperatorStats _operatorStats;
+
   // TODO: refactor Pinot Reducer code to support the intermediate stage agg 
operator.
   // aggCalls has to be a list of FunctionCall and cannot be null
   // groupSet has to be a list of InputRef and cannot be null
   // TODO: Add these two checks when we confirm we can handle error in 
upstream ctor call.
-  public AggregateOperator(MultiStageOperator inputOperator, DataSchema 
dataSchema,
-      List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema 
inputSchema) {
-    this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, 
AggregateOperator.Accumulator.MERGERS);
+  public AggregateOperator(MultiStageOperator inputOperator, DataSchema 
dataSchema, List<RexExpression> aggCalls,
+      List<RexExpression> groupSet, DataSchema inputSchema, long requestId, 
int stageId) {
+    this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, 
AggregateOperator.Accumulator.MERGERS, requestId,
+        stageId);
   }
 
   @VisibleForTesting
-  AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema,
-      List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema 
inputSchema, Map<String,
-      Function<DataSchema.ColumnDataType, Merger>> mergers) {
+  AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, 
List<RexExpression> aggCalls,
+      List<RexExpression> groupSet, DataSchema inputSchema,
+      Map<String, Function<DataSchema.ColumnDataType, Merger>> mergers, long 
requestId, int stageId) {
     _inputOperator = inputOperator;
     _groupSet = groupSet;
     _upstreamErrorBlock = null;
 
     // we expect all agg calls to be aggregate function calls
-    _aggCalls = aggCalls.stream()
-        .map(RexExpression.FunctionCall.class::cast)
-        .collect(Collectors.toList());
+    _aggCalls = 
aggCalls.stream().map(RexExpression.FunctionCall.class::cast).collect(Collectors.toList());
 
     _accumulators = new Accumulator[_aggCalls.size()];
     for (int i = 0; i < _aggCalls.size(); i++) {
@@ -105,6 +111,7 @@ public class AggregateOperator extends MultiStageOperator {
     _resultSchema = dataSchema;
     _readyToConstruct = false;
     _hasReturnedAggregateBlock = false;
+    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -115,11 +122,15 @@ public class AggregateOperator extends MultiStageOperator 
{
   @Nullable
   @Override
   public String toExplainString() {
+    // TODO: move to close call;
+    _inputOperator.toExplainString();
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
+    _operatorStats.startTimer();
     try {
       if (!_readyToConstruct && !consumeInputBlocks()) {
         return TransferableBlockUtils.getNoOpTransferableBlock();
@@ -132,10 +143,13 @@ public class AggregateOperator extends MultiStageOperator 
{
       if (!_hasReturnedAggregateBlock) {
         return produceAggregatedBlock();
       } else {
+        // TODO: Move to close call.
         return TransferableBlockUtils.getEndOfStreamTransferableBlock();
       }
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
+    } finally {
+      _operatorStats.endTimer();
     }
   }
 
@@ -154,6 +168,7 @@ public class AggregateOperator extends MultiStageOperator {
     if (rows.size() == 0) {
       return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     } else {
+      _operatorStats.recordOutput(1, rows.size());
       return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
     }
   }
@@ -162,7 +177,9 @@ public class AggregateOperator extends MultiStageOperator {
    * @return whether or not the operator is ready to move on (EOS or ERROR)
    */
   private boolean consumeInputBlocks() {
+    _operatorStats.endTimer();
     TransferableBlock block = _inputOperator.nextBlock();
+    _operatorStats.startTimer();
     while (!block.isNoOpBlock()) {
       // setting upstream error block
       if (block.isErrorBlock()) {
@@ -181,7 +198,10 @@ public class AggregateOperator extends MultiStageOperator {
           _accumulators[i].accumulate(key, row);
         }
       }
+      _operatorStats.recordInput(1, container.size());
+      _operatorStats.endTimer();
       block = _inputOperator.nextBlock();
+      _operatorStats.startTimer();
     }
     return false;
   }
@@ -269,32 +289,25 @@ public class AggregateOperator extends MultiStageOperator 
{
   }
 
   private static class Accumulator {
-
-    private static final Map<String, Function<DataSchema.ColumnDataType, 
Merger>> MERGERS = ImmutableMap
-        .<String, Function<DataSchema.ColumnDataType, Merger>>builder()
-        .put("SUM", cdt -> AggregateOperator::mergeSum)
-        .put("$SUM", cdt -> AggregateOperator::mergeSum)
-        .put("$SUM0", cdt -> AggregateOperator::mergeSum)
-        .put("MIN", cdt -> AggregateOperator::mergeMin)
-        .put("$MIN", cdt -> AggregateOperator::mergeMin)
-        .put("$MIN0", cdt -> AggregateOperator::mergeMin)
-        .put("MAX", cdt -> AggregateOperator::mergeMax)
-        .put("$MAX", cdt -> AggregateOperator::mergeMax)
-        .put("$MAX0", cdt -> AggregateOperator::mergeMax)
-        .put("COUNT", cdt -> AggregateOperator::mergeCount)
-        .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
-        .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
-        .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd)
-        .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
-        .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
-        .put("$BOOL_OR0", cdt -> AggregateOperator::mergeBoolOr)
-        .put("FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
-            ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
-        .put("$FOURTHMOMENT", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
-            ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
-        .put("$FOURTHMOMENT0", cdt -> cdt == DataSchema.ColumnDataType.OBJECT
-            ? new MergeFourthMomentObject() : new MergeFourthMomentNumeric())
-        .build();
+    private static final Map<String, Function<DataSchema.ColumnDataType, 
Merger>> MERGERS =
+        ImmutableMap.<String, Function<DataSchema.ColumnDataType, 
Merger>>builder()
+            .put("SUM", cdt -> AggregateOperator::mergeSum).put("$SUM", cdt -> 
AggregateOperator::mergeSum)
+            .put("$SUM0", cdt -> AggregateOperator::mergeSum).put("MIN", cdt 
-> AggregateOperator::mergeMin)
+            .put("$MIN", cdt -> AggregateOperator::mergeMin).put("$MIN0", cdt 
-> AggregateOperator::mergeMin)
+            .put("MAX", cdt -> AggregateOperator::mergeMax).put("$MAX", cdt -> 
AggregateOperator::mergeMax)
+            .put("$MAX0", cdt -> AggregateOperator::mergeMax).put("COUNT", cdt 
-> AggregateOperator::mergeCount)
+            .put("BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
+            .put("$BOOL_AND", cdt -> AggregateOperator::mergeBoolAnd)
+            .put("$BOOL_AND0", cdt -> AggregateOperator::mergeBoolAnd)
+            .put("BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
+            .put("$BOOL_OR", cdt -> AggregateOperator::mergeBoolOr)
+            .put("$BOOL_OR0", cdt -> 
AggregateOperator::mergeBoolOr).put("FOURTHMOMENT",
+                cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new 
MergeFourthMomentObject()
+                    : new MergeFourthMomentNumeric()).put("$FOURTHMOMENT",
+                cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new 
MergeFourthMomentObject()
+                    : new MergeFourthMomentNumeric()).put("$FOURTHMOMENT0",
+                cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new 
MergeFourthMomentObject()
+                    : new MergeFourthMomentNumeric()).build();
 
     final int _inputRef;
     final Object _literal;
@@ -336,8 +349,7 @@ public class AggregateOperator extends MultiStageOperator {
     private RexExpression 
toAggregationFunctionOperand(RexExpression.FunctionCall rexExpression) {
       List<RexExpression> functionOperands = 
rexExpression.getFunctionOperands();
       Preconditions.checkState(functionOperands.size() < 2, "aggregate 
functions cannot have more than one operand");
-      return functionOperands.size() > 0
-          ? functionOperands.get(0)
+      return functionOperands.size() > 0 ? functionOperands.get(0)
           : new RexExpression.Literal(FieldSpec.DataType.INT, 1);
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index 3ae9eac98f..6f57ece6df 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -29,6 +29,8 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
 import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /*
@@ -47,15 +49,21 @@ import 
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
 public class FilterOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "FILTER";
   private final MultiStageOperator _upstreamOperator;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AggregateOperator.class);
   private final TransformOperand _filterOperand;
   private final DataSchema _dataSchema;
   private TransferableBlock _upstreamErrorBlock;
 
-  public FilterOperator(MultiStageOperator upstreamOperator, DataSchema 
dataSchema, RexExpression filter) {
+  // TODO: Move to OperatorContext class.
+  private OperatorStats _operatorStats;
+
+  public FilterOperator(MultiStageOperator upstreamOperator, DataSchema 
dataSchema, RexExpression filter,
+      long requestId, int stageId) {
     _upstreamOperator = upstreamOperator;
     _dataSchema = dataSchema;
     _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
     _upstreamErrorBlock = null;
+    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -66,15 +74,23 @@ public class FilterOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
+    _upstreamOperator.toExplainString();
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
+    _operatorStats.startTimer();
     try {
-      return transform(_upstreamOperator.nextBlock());
+      _operatorStats.endTimer();
+      TransferableBlock block = _upstreamOperator.nextBlock();
+      _operatorStats.startTimer();
+      return transform(block);
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
+    } finally {
+      _operatorStats.endTimer();
     }
   }
 
@@ -97,6 +113,8 @@ public class FilterOperator extends MultiStageOperator {
         resultRows.add(row);
       }
     }
+    _operatorStats.recordInput(1, container.size());
+    _operatorStats.recordOutput(1, resultRows.size());
     return new TransferableBlock(resultRows, _dataSchema, DataBlock.Type.ROW);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index be47c9f389..b4e88965cb 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -39,6 +39,9 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
 import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * This basic {@code BroadcastJoinOperator} implement a basic broadcast join 
algorithm.
@@ -55,6 +58,8 @@ import 
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
 // TODO: Move inequi out of hashjoin. 
(https://github.com/apache/pinot/issues/9728)
 public class HashJoinOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "HASH_JOIN";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AggregateOperator.class);
+
   private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES =
       ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, 
JoinRelType.FULL);
 
@@ -82,8 +87,10 @@ public class HashJoinOperator extends MultiStageOperator {
   private KeySelector<Object[], Object[]> _leftKeySelector;
   private KeySelector<Object[], Object[]> _rightKeySelector;
 
+  private OperatorStats _operatorStats;
+
   public HashJoinOperator(MultiStageOperator leftTableOperator, 
MultiStageOperator rightTableOperator,
-      DataSchema leftSchema, JoinNode node) {
+      DataSchema leftSchema, JoinNode node, long requestId, int stageId) {
     
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
         "Join type: " + node.getJoinRelType() + " is not supported!");
     _joinType = node.getJoinRelType();
@@ -111,6 +118,7 @@ public class HashJoinOperator extends MultiStageOperator {
       _matchedRightRows = null;
     }
     _upstreamErrorBlock = null;
+    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   // TODO: Separate left and right table operator.
@@ -122,11 +130,15 @@ public class HashJoinOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
+    _leftTableOperator.toExplainString();
+    _rightTableOperator.toExplainString();
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
+    _operatorStats.startTimer();
     try {
       if (_isTerminated) {
         return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -140,15 +152,22 @@ public class HashJoinOperator extends MultiStageOperator {
       } else if (!_isHashTableBuilt) {
         return TransferableBlockUtils.getNoOpTransferableBlock();
       }
+      _operatorStats.endTimer();
+      TransferableBlock leftBlock = _leftTableOperator.nextBlock();
+      _operatorStats.startTimer();
       // JOIN each left block with the right block.
-      return buildJoinedDataBlock(_leftTableOperator.nextBlock());
+      return buildJoinedDataBlock(leftBlock);
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
+    } finally {
+      _operatorStats.endTimer();
     }
   }
 
   private void buildBroadcastHashTable() {
+    _operatorStats.endTimer();
     TransferableBlock rightBlock = _rightTableOperator.nextBlock();
+    _operatorStats.startTimer();
     while (!rightBlock.isNoOpBlock()) {
       if (rightBlock.isErrorBlock()) {
         _upstreamErrorBlock = rightBlock;
@@ -165,8 +184,10 @@ public class HashJoinOperator extends MultiStageOperator {
             _broadcastRightTable.computeIfAbsent(new 
Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
         hashCollection.add(row);
       }
-
+      _operatorStats.recordInput(1, container.size());
+      _operatorStats.endTimer();
       rightBlock = _rightTableOperator.nextBlock();
+      _operatorStats.startTimer();
     }
   }
 
@@ -196,6 +217,7 @@ public class HashJoinOperator extends MultiStageOperator {
         }
       }
       _isTerminated = true;
+      _operatorStats.recordOutput(1, returnRows.size());
       return new TransferableBlock(returnRows, _resultSchema, 
DataBlock.Type.ROW);
     }
     List<Object[]> rows = new ArrayList<>();
@@ -230,6 +252,8 @@ public class HashJoinOperator extends MultiStageOperator {
         rows.add(joinRow(leftRow, null));
       }
     }
+    _operatorStats.recordInput(1, container.size());
+    _operatorStats.recordOutput(1, rows.size());
     return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index e794a84194..baf7373a07 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -40,6 +40,8 @@ import 
org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -57,17 +59,23 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlock;
  */
 public class LeafStageTransferableBlockOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LeafStageTransferableBlockOperator.class);
 
   private final InstanceResponseBlock _errorBlock;
   private final List<InstanceResponseBlock> _baseResultBlock;
   private final DataSchema _desiredDataSchema;
   private int _currentIndex;
 
-  public LeafStageTransferableBlockOperator(List<InstanceResponseBlock> 
baseResultBlock, DataSchema dataSchema) {
+  // TODO: Move to OperatorContext class.
+  private OperatorStats _operatorStats;
+
+  public LeafStageTransferableBlockOperator(List<InstanceResponseBlock> 
baseResultBlock, DataSchema dataSchema,
+      long requestId, int stageId) {
     _baseResultBlock = baseResultBlock;
     _desiredDataSchema = dataSchema;
     _errorBlock = baseResultBlock.stream().filter(e -> 
!e.getExceptions().isEmpty()).findFirst().orElse(null);
     _currentIndex = 0;
+    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -78,29 +86,39 @@ public class LeafStageTransferableBlockOperator extends 
MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    if (_currentIndex < 0) {
-      throw new RuntimeException("Leaf transfer terminated. next block should 
no longer be called.");
-    }
-    if (_errorBlock != null) {
-      _currentIndex = -1;
-      return new 
TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
-    } else {
-      if (_currentIndex < _baseResultBlock.size()) {
-        InstanceResponseBlock responseBlock = 
_baseResultBlock.get(_currentIndex++);
-        if (responseBlock.getResultsBlock() != null && 
responseBlock.getResultsBlock().getNumRows() > 0) {
-          return composeTransferableBlock(responseBlock, _desiredDataSchema);
+    try {
+      _operatorStats.startTimer();
+      if (_currentIndex < 0) {
+        throw new RuntimeException("Leaf transfer terminated. next block 
should no longer be called.");
+      }
+      if (_errorBlock != null) {
+        _currentIndex = -1;
+        return new 
TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
+      } else {
+        if (_currentIndex < _baseResultBlock.size()) {
+          InstanceResponseBlock responseBlock = 
_baseResultBlock.get(_currentIndex++);
+          if (responseBlock.getResultsBlock() != null && 
responseBlock.getResultsBlock().getNumRows() > 0) {
+            _operatorStats.recordInput(1, 
responseBlock.getResultsBlock().getNumRows());
+            _operatorStats.recordOutput(1, 
responseBlock.getResultsBlock().getNumRows());
+            return composeTransferableBlock(responseBlock, _desiredDataSchema);
+          } else {
+            _operatorStats.recordInput(1, 
responseBlock.getResultsBlock().getNumRows());
+            _operatorStats.recordOutput(1, 
responseBlock.getResultsBlock().getNumRows());
+            return new TransferableBlock(Collections.emptyList(), 
_desiredDataSchema, DataBlock.Type.ROW);
+          }
         } else {
-          return new TransferableBlock(Collections.emptyList(), 
_desiredDataSchema, DataBlock.Type.ROW);
+          _currentIndex = -1;
+          return new 
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
         }
-      } else {
-        _currentIndex = -1;
-        return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
       }
+    } finally {
+      _operatorStats.endTimer();
     }
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index cace9fa974..8fc160f205 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -27,17 +27,24 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class LiteralValueOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "LITERAL_VALUE_PROVIDER";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LiteralValueOperator.class);
 
   private final DataSchema _dataSchema;
   private final TransferableBlock _rexLiteralBlock;
   private boolean _isLiteralBlockReturned;
 
-  public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> 
rexLiteralRows) {
+  private OperatorStats _operatorStats;
+
+  public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> 
rexLiteralRows,
+      long requestId, int stageId) {
     _dataSchema = dataSchema;
+    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
     _rexLiteralBlock = constructBlock(rexLiteralRows);
     _isLiteralBlockReturned = false;
   }
@@ -50,16 +57,22 @@ public class LiteralValueOperator extends 
MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    if (!_isLiteralBlockReturned) {
-      _isLiteralBlockReturned = true;
-      return _rexLiteralBlock;
-    } else {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    try {
+      _operatorStats.startTimer();
+      if (!_isLiteralBlockReturned) {
+        _isLiteralBlockReturned = true;
+        return _rexLiteralBlock;
+      } else {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
+    } finally {
+      _operatorStats.endTimer();
     }
   }
 
@@ -72,6 +85,8 @@ public class LiteralValueOperator extends MultiStageOperator {
       }
       blockContent.add(row);
     }
+    _operatorStats.recordInput(1, blockContent.size());
+    _operatorStats.recordOutput(1, blockContent.size());
     return new TransferableBlock(blockContent, _dataSchema, 
DataBlock.Type.ROW);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ee97a99e79..aa569ba56f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -65,6 +65,7 @@ public class MailboxReceiveOperator extends 
MultiStageOperator {
   private final long _deadlineTimestampNano;
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
+  private OperatorStats _operatorStats;
 
   private static MailboxIdentifier toMailboxId(ServerInstance fromInstance, 
long jobId, long stageId,
       String receiveHostName, int receivePort) {
@@ -109,6 +110,7 @@ public class MailboxReceiveOperator extends 
MultiStageOperator {
     }
     _upstreamErrorBlock = null;
     _serverIdx = 0;
+    _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME);
   }
 
   public List<MailboxIdentifier> getSendingMailbox() {
@@ -123,61 +125,68 @@ public class MailboxReceiveOperator extends 
MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
-    if (_upstreamErrorBlock != null) {
-      return _upstreamErrorBlock;
-    } else if (System.nanoTime() >= _deadlineTimestampNano) {
-      LOGGER.error("Timed out after polling mailboxes: {}", _sendingMailbox);
-      return 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-    }
+    try {
+      _operatorStats.startTimer();
+      if (_upstreamErrorBlock != null) {
+        return _upstreamErrorBlock;
+      } else if (System.nanoTime() >= _deadlineTimestampNano) {
+        return 
TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+      }
 
-    int startingIdx = _serverIdx;
-    int openMailboxCount = 0;
-    int eosMailboxCount = 0;
-
-    // For all non-singleton distribution, we poll from every instance to 
check mailbox content.
-    // TODO: Fix wasted CPU cycles on waiting for servers that are not 
supposed to give content.
-    for (int i = 0; i < _sendingMailbox.size(); i++) {
-      // this implements a round-robin mailbox iterator, so we don't starve 
any mailboxes
-      _serverIdx = (startingIdx + i) % _sendingMailbox.size();
-      MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
-      try {
-        ReceivingMailbox<TransferableBlock> mailbox = 
_mailboxService.getReceivingMailbox(mailboxId);
-        if (!mailbox.isClosed()) {
-          openMailboxCount++;
-          TransferableBlock block = mailbox.receive();
-
-          // Get null block when pulling times out from mailbox.
-          if (block != null) {
-            if (block.isErrorBlock()) {
-              _upstreamErrorBlock =
-                  
TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
-              return _upstreamErrorBlock;
-            }
-            if (!block.isEndOfStreamBlock()) {
-              return block;
-            } else {
-              eosMailboxCount++;
+      int startingIdx = _serverIdx;
+      int openMailboxCount = 0;
+      int eosMailboxCount = 0;
+
+      // For all non-singleton distribution, we poll from every instance to 
check mailbox content.
+      // TODO: Fix wasted CPU cycles on waiting for servers that are not 
supposed to give content.
+      for (int i = 0; i < _sendingMailbox.size(); i++) {
+        // this implements a round-robin mailbox iterator, so we don't starve 
any mailboxes
+        _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+        MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
+        try {
+          ReceivingMailbox<TransferableBlock> mailbox = 
_mailboxService.getReceivingMailbox(mailboxId);
+          if (!mailbox.isClosed()) {
+            openMailboxCount++;
+            TransferableBlock block = mailbox.receive();
+            // Get null block when pulling times out from mailbox.
+            if (block != null) {
+              if (block.isErrorBlock()) {
+                _upstreamErrorBlock =
+                    
TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
+                return _upstreamErrorBlock;
+              }
+              if (!block.isEndOfStreamBlock()) {
+                _operatorStats.recordInput(1, block.getNumRows());
+                _operatorStats.recordOutput(1, block.getNumRows());
+                return block;
+              } else {
+                eosMailboxCount++;
+              }
             }
           }
+        } catch (Exception e) {
+          return TransferableBlockUtils.getErrorTransferableBlock(
+              new RuntimeException(String.format("Error polling mailbox=%s", 
mailboxId), e));
         }
-      } catch (Exception e) {
-        return TransferableBlockUtils.getErrorTransferableBlock(
-            new RuntimeException(String.format("Error polling mailbox=%s", 
mailboxId), e));
       }
-    }
 
-    // there are two conditions in which we should return EOS: (1) there were
-    // no mailboxes to open (this shouldn't happen because the second condition
-    // should be hit first, but is defensive) (2) every mailbox that was opened
-    // returned an EOS block. in every other scenario, there are mailboxes that
-    // are not yet exhausted and we should wait for more data to be available
-    return openMailboxCount > 0 && openMailboxCount > eosMailboxCount
-        ? TransferableBlockUtils.getNoOpTransferableBlock()
-        : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      // there are two conditions in which we should return EOS: (1) there were
+      // no mailboxes to open (this shouldn't happen because the second 
condition
+      // should be hit first, but is defensive) (2) every mailbox that was 
opened
+      // returned an EOS block. in every other scenario, there are mailboxes 
that
+      // are not yet exhausted and we should wait for more data to be available
+      TransferableBlock block =
+          openMailboxCount > 0 && openMailboxCount > eosMailboxCount ? 
TransferableBlockUtils.getNoOpTransferableBlock()
+              : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      return block;
+    } finally {
+      _operatorStats.endTimer();
+    }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index a6299ea60a..79d64bfefe 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -54,6 +54,7 @@ public class MailboxSendOperator extends MultiStageOperator {
 
   private final MultiStageOperator _dataTableBlockBaseOperator;
   private final BlockExchange _exchange;
+  private OperatorStats _operatorStats;
 
   @VisibleForTesting
   interface BlockExchangeFactory {
@@ -71,14 +72,14 @@ public class MailboxSendOperator extends MultiStageOperator 
{
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector, String hostName, int port,
       long jobId, int stageId) {
     this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, 
exchangeType, keySelector,
-        server -> toMailboxId(server, jobId, stageId, hostName, port), 
BlockExchange::getExchange);
+        server -> toMailboxId(server, jobId, stageId, hostName, port), 
BlockExchange::getExchange, jobId, stageId);
   }
 
   @VisibleForTesting
   MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
       MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance> 
receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector,
-      MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory 
blockExchangeFactory) {
+      MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory 
blockExchangeFactory, long jobId, int stageId) {
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
 
     List<MailboxIdentifier> receivingMailboxes;
@@ -106,6 +107,7 @@ public class MailboxSendOperator extends MultiStageOperator 
{
 
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
         String.format("Exchange type '%s' is not supported yet", 
exchangeType));
+    _operatorStats = new OperatorStats(jobId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -116,22 +118,30 @@ public class MailboxSendOperator extends 
MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
+    _dataTableBlockBaseOperator.toExplainString();
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
+    _operatorStats.startTimer();
     TransferableBlock transferableBlock;
     try {
+      _operatorStats.endTimer();
       transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+      _operatorStats.startTimer();
       while (!transferableBlock.isNoOpBlock()) {
         _exchange.send(transferableBlock);
-
+        _operatorStats.recordInput(1, transferableBlock.getNumRows());
+        // The # of output block is not accurate because we may do a split in 
exchange send.
+        _operatorStats.recordOutput(1, transferableBlock.getNumRows());
         if (transferableBlock.isEndOfStreamBlock()) {
           return transferableBlock;
         }
-
+        _operatorStats.endTimer();
         transferableBlock = _dataTableBlockBaseOperator.nextBlock();
+        _operatorStats.startTimer();
       }
     } catch (final Exception e) {
       // ideally, MailboxSendOperator doesn't ever throw an exception because
@@ -143,8 +153,9 @@ public class MailboxSendOperator extends MultiStageOperator 
{
       } catch (Exception e2) {
         LOGGER.error("Exception while sending block to mailbox.", e2);
       }
+    } finally {
+      _operatorStats.endTimer();
     }
-
     return transferableBlock;
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index 424d7d003f..ae6bae362f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -52,6 +52,7 @@ public class OpChain implements AutoCloseable {
     return _receivingMailbox;
   }
 
+  // TODO: Move OperatorStats here.
   public OpChainStats getStats() {
     return _stats;
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
index 58327c40da..07705f8bac 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -34,13 +34,14 @@ public class OpChainStats {
 
   // use memoized supplier so that the timing doesn't start until the
   // first time we get the timer
-  private final Supplier<ThreadResourceUsageProvider> _exTimer
-      = Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
+  private final Supplier<ThreadResourceUsageProvider> _exTimer =
+      Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
 
   // this is used to make sure that toString() doesn't have side
   // effects (accidentally starting the timer)
   private volatile boolean _exTimerStarted = false;
 
+  private final Stopwatch _executeStopwatch = Stopwatch.createUnstarted();
   private final Stopwatch _queuedStopwatch = Stopwatch.createUnstarted();
   private final AtomicLong _queuedCount = new AtomicLong();
 
@@ -62,20 +63,23 @@ public class OpChainStats {
     if (!_queuedStopwatch.isRunning()) {
       _queuedStopwatch.start();
     }
+    if (_executeStopwatch.isRunning()) {
+      _executeStopwatch.stop();
+    }
   }
 
   public void startExecutionTimer() {
     _exTimerStarted = true;
     _exTimer.get();
+    if (!_executeStopwatch.isRunning()) {
+      _executeStopwatch.start();
+    }
   }
 
   @Override
   public String toString() {
-    return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued 
Time: %sms",
-        _id,
-        _queuedCount.get(),
-        _exTimerStarted ? 
TimeUnit.NANOSECONDS.toMillis(_exTimer.get().getThreadTimeNs()) : 0,
-        _queuedStopwatch.elapsed(TimeUnit.MILLISECONDS)
-    );
+    return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued 
Time: %sms", _id, _queuedCount.get(),
+        _exTimerStarted ? _executeStopwatch.elapsed(TimeUnit.MILLISECONDS) : 0,
+        _queuedStopwatch.elapsed(TimeUnit.MILLISECONDS));
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
new file mode 100644
index 0000000000..c40b96b3c8
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+
+
+public class OperatorStats {
+  private final Stopwatch _executeStopwatch = Stopwatch.createUnstarted();
+
+  // TODO: add a operatorId for better tracking purpose.
+  private final int _stageId;
+  private final long _requestId;
+
+  private final String _operatorType;
+
+  private int _numInputBlock = 0;
+  private int _numInputRows = 0;
+
+  private int _numOutputBlock = 0;
+
+  private int _numOutputRows = 0;
+
+  public OperatorStats(long requestId, int stageId, String operatorType) {
+    _stageId = stageId;
+    _requestId = requestId;
+    _operatorType = operatorType;
+  }
+
+  public void startTimer() {
+    if (!_executeStopwatch.isRunning()) {
+      _executeStopwatch.start();
+    }
+  }
+
+  public void endTimer() {
+    if (_executeStopwatch.isRunning()) {
+      _executeStopwatch.stop();
+    }
+  }
+
+  public void recordInput(int numBlock, int numRows) {
+    _numInputBlock += numBlock;
+    _numInputRows += numRows;
+  }
+
+  public void recordOutput(int numBlock, int numRows) {
+    _numOutputBlock += numBlock;
+    _numOutputRows += numRows;
+  }
+
+  // TODO: Return the string as a JSON string.
+  @Override
+  public String toString() {
+    return String.format(
+        "OperatorStats[type: %s, requestId: %s, stageId %s] ExecutionWallTime: 
%sms, InputRows: %s, InputBlock: "
+            + "%s, OutputRows: %s, OutputBlock: %s", _operatorType, 
_requestId, _stageId,
+        _executeStopwatch.elapsed(TimeUnit.MILLISECONDS), _numInputRows, 
_numInputBlock, _numOutputRows,
+        _numOutputBlock);
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 4ba3dbde94..13f4306e01 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -32,11 +32,15 @@ import 
org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class SortOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "SORT";
   private final MultiStageOperator _upstreamOperator;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SortOperator.class);
+
   private final int _fetch;
   private final int _offset;
   private final DataSchema _dataSchema;
@@ -46,17 +50,19 @@ public class SortOperator extends MultiStageOperator {
   private boolean _readyToConstruct;
   private boolean _isSortedBlockConstructed;
   private TransferableBlock _upstreamErrorBlock;
+  private OperatorStats _operatorStats;
 
   public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> 
collationKeys,
-      List<RelFieldCollation.Direction> collationDirections, int fetch, int 
offset, DataSchema dataSchema) {
+      List<RelFieldCollation.Direction> collationDirections, int fetch, int 
offset, DataSchema dataSchema,
+      long requestId, int stageId) {
     this(upstreamOperator, collationKeys, collationDirections, fetch, offset, 
dataSchema,
-        SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
+        SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, requestId, 
stageId);
   }
 
   @VisibleForTesting
   SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> 
collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int 
offset, DataSchema dataSchema,
-      int maxHolderCapacity) {
+      int maxHolderCapacity, long requestId, int stageId) {
     _upstreamOperator = upstreamOperator;
     _fetch = fetch;
     _offset = offset;
@@ -68,6 +74,7 @@ public class SortOperator extends MultiStageOperator {
         : maxHolderCapacity;
     _rows = new PriorityQueue<>(_numRowsToKeep,
         new SortComparator(collationKeys, collationDirections, dataSchema, 
false));
+    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -78,21 +85,27 @@ public class SortOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
+    _upstreamOperator.toExplainString();
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
+    _operatorStats.startTimer();
     try {
       consumeInputBlocks();
       return produceSortedBlock();
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
+    } finally {
+      _operatorStats.endTimer();
     }
   }
 
   private TransferableBlock produceSortedBlock() {
     if (_upstreamErrorBlock != null) {
+      LOGGER.error("OperatorStats:" + _operatorStats);
       return _upstreamErrorBlock;
     } else if (!_readyToConstruct) {
       return TransferableBlockUtils.getNoOpTransferableBlock();
@@ -104,6 +117,7 @@ public class SortOperator extends MultiStageOperator {
         Object[] row = _rows.poll();
         rows.addFirst(row);
       }
+      _operatorStats.recordOutput(1, rows.size());
       _isSortedBlockConstructed = true;
       if (rows.size() == 0) {
         return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -117,7 +131,9 @@ public class SortOperator extends MultiStageOperator {
 
   private void consumeInputBlocks() {
     if (!_isSortedBlockConstructed) {
+      _operatorStats.endTimer();
       TransferableBlock block = _upstreamOperator.nextBlock();
+      _operatorStats.startTimer();
       while (!block.isNoOpBlock()) {
         // setting upstream error block
         if (block.isErrorBlock()) {
@@ -132,8 +148,10 @@ public class SortOperator extends MultiStageOperator {
         for (Object[] row : container) {
           SelectionOperatorUtils.addToPriorityQueue(row, _rows, 
_numRowsToKeep);
         }
-
+        _operatorStats.endTimer();
         block = _upstreamOperator.nextBlock();
+        _operatorStats.startTimer();
+        _operatorStats.recordInput(1, container.size());
       }
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 83db10d4a2..a75ad6b017 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -30,6 +30,8 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
 import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -45,14 +47,16 @@ import 
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
 public class TransformOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "TRANSFORM";
   private final MultiStageOperator _upstreamOperator;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TransformOperator.class);
   private final List<TransformOperand> _transformOperandsList;
   private final int _resultColumnSize;
   // TODO: Check type matching between resultSchema and the actual result.
   private final DataSchema _resultSchema;
   private TransferableBlock _upstreamErrorBlock;
+  private OperatorStats _operatorStats;
 
   public TransformOperator(MultiStageOperator upstreamOperator, DataSchema 
resultSchema,
-      List<RexExpression> transforms, DataSchema upstreamDataSchema) {
+      List<RexExpression> transforms, DataSchema upstreamDataSchema, long 
requestId, int stageId) {
     Preconditions.checkState(!transforms.isEmpty(), "transform operand should 
not be empty.");
     Preconditions.checkState(resultSchema.size() == transforms.size(),
         "result schema size:" + resultSchema.size() + " doesn't match 
transform operand size:" + transforms.size());
@@ -63,6 +67,7 @@ public class TransformOperator extends MultiStageOperator {
       
_transformOperandsList.add(TransformOperand.toTransformOperand(rexExpression, 
upstreamDataSchema));
     }
     _resultSchema = resultSchema;
+    _operatorStats = new OperatorStats(requestId, stageId, EXPLAIN_NAME);
   }
 
   @Override
@@ -73,15 +78,23 @@ public class TransformOperator extends MultiStageOperator {
   @Nullable
   @Override
   public String toExplainString() {
+    _upstreamOperator.toExplainString();
+    LOGGER.debug(_operatorStats.toString());
     return EXPLAIN_NAME;
   }
 
   @Override
   protected TransferableBlock getNextBlock() {
+    _operatorStats.startTimer();
     try {
-      return transform(_upstreamOperator.nextBlock());
+      _operatorStats.endTimer();
+      TransferableBlock block = _upstreamOperator.nextBlock();
+      _operatorStats.startTimer();
+      return transform(block);
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
+    } finally {
+      _operatorStats.endTimer();
     }
   }
 
@@ -103,11 +116,13 @@ public class TransformOperator extends MultiStageOperator 
{
     for (Object[] row : container) {
       Object[] resultRow = new Object[_resultColumnSize];
       for (int i = 0; i < _resultColumnSize; i++) {
-        resultRow[i] = 
FunctionInvokeUtils.convert(_transformOperandsList.get(i).apply(row),
-            _resultSchema.getColumnDataType(i));
+        resultRow[i] =
+            
FunctionInvokeUtils.convert(_transformOperandsList.get(i).apply(row), 
_resultSchema.getColumnDataType(i));
       }
       resultRows.add(resultRow);
     }
+    _operatorStats.recordInput(1, container.size());
+    _operatorStats.recordOutput(1, resultRows.size());
     return new TransferableBlock(resultRows, _resultSchema, 
DataBlock.Type.ROW);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index aaa140463f..d675efea62 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -84,13 +84,14 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator,
   public MultiStageOperator visitAggregate(AggregateNode node, 
PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     return new AggregateOperator(nextOperator, node.getDataSchema(), 
node.getAggCalls(),
-        node.getGroupSet(), node.getInputs().get(0).getDataSchema());
+        node.getGroupSet(), node.getInputs().get(0).getDataSchema(), 
context._requestId, context._stageId);
   }
 
   @Override
   public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext 
context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
-    return new FilterOperator(nextOperator, node.getDataSchema(), 
node.getCondition());
+    return new FilterOperator(nextOperator, node.getDataSchema(), 
node.getCondition(), context.getRequestId(),
+        context.getStageId());
   }
 
   @Override
@@ -101,21 +102,22 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator,
     MultiStageOperator leftOperator = left.visit(this, context);
     MultiStageOperator rightOperator = right.visit(this, context);
 
-    return new HashJoinOperator(leftOperator, rightOperator, 
left.getDataSchema(), node);
+    return new HashJoinOperator(leftOperator, rightOperator, 
left.getDataSchema(), node, context.getRequestId(),
+        context.getStageId());
   }
 
   @Override
   public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext 
context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     return new TransformOperator(nextOperator, node.getDataSchema(), 
node.getProjects(),
-        node.getInputs().get(0).getDataSchema());
+        node.getInputs().get(0).getDataSchema(), context.getRequestId(), 
context.getStageId());
   }
 
   @Override
   public MultiStageOperator visitSort(SortNode node, PlanRequestContext 
context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     return new SortOperator(nextOperator, node.getCollationKeys(), 
node.getCollationDirections(),
-        node.getFetch(), node.getOffset(), node.getDataSchema());
+        node.getFetch(), node.getOffset(), node.getDataSchema(), 
context.getRequestId(), context.getStageId());
   }
 
   @Override
@@ -125,6 +127,7 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator,
 
   @Override
   public MultiStageOperator visitValue(ValueNode node, PlanRequestContext 
context) {
-    return new LiteralValueOperator(node.getDataSchema(), 
node.getLiteralRows());
+    return new LiteralValueOperator(node.getDataSchema(), 
node.getLiteralRows(), context.getRequestId(),
+        context.getStageId());
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 25b7aa5e8b..4cbd4aa624 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -46,12 +46,16 @@ import 
org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.roaringbitmap.RoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * {@code QueryDispatcher} dispatch a query to different workers.
  */
 public class QueryDispatcher {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryDispatcher.class);
+
   private final Map<String, DispatchClient> _dispatchClientMap = new 
ConcurrentHashMap<>();
 
   public QueryDispatcher() {
@@ -69,8 +73,14 @@ public class QueryDispatcher {
         reduceNode.getSenderStageId(), reduceNode.getDataSchema(), 
mailboxService.getHostname(),
         mailboxService.getMailboxPort(), timeoutMs);
     List<DataBlock> resultDataBlocks = 
reduceMailboxReceive(mailboxReceiveOperator, timeoutMs);
-    return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
+    mailboxReceiveOperator.toExplainString();
+    long toResultTableStartTime = System.currentTimeMillis();
+    ResultTable resultTable = toResultTable(resultDataBlocks, 
queryPlan.getQueryResultFields(),
         queryPlan.getQueryStageMap().get(0).getDataSchema());
+    LOGGER.debug(
+        "RequestId:" + requestId + " StageId: 0 Broker toResultTable 
processing time:" + (System.currentTimeMillis()
+            - toResultTableStartTime) + " ms");
+    return resultTable;
   }
 
   public int submit(long requestId, QueryPlan queryPlan, long timeoutMs)
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index aa7394a6a8..6f2212e6b6 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -71,7 +71,7 @@ public class AggregateOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, 1, 2);
 
     // When:
     TransferableBlock block1 = operator.nextBlock(); // build
@@ -87,12 +87,11 @@ public class AggregateOperatorTest {
     List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
     List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
 
-    Mockito.when(_input.nextBlock())
-        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, 1, 2);
 
     // When:
     TransferableBlock block = operator.nextBlock();
@@ -109,13 +108,12 @@ public class AggregateOperatorTest {
     List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
-    Mockito.when(_input.nextBlock())
-        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 1}))
+    
Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, 
new Object[]{1, 1}))
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, 1, 2);
 
     // When:
     TransferableBlock block1 = operator.nextBlock(); // build when reading 
NoOp block
@@ -134,12 +132,11 @@ public class AggregateOperatorTest {
     List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
-    Mockito.when(_input.nextBlock())
-        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1}))
+    
Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, 
new Object[]{2, 1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, 1, 2);
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -160,12 +157,11 @@ public class AggregateOperatorTest {
     List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
 
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
-    Mockito.when(_input.nextBlock())
-        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3}))
+    
Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, 
new Object[]{2, 3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, 1, 2);
 
     // When:
     TransferableBlock block1 = operator.nextBlock();
@@ -196,9 +192,8 @@ public class AggregateOperatorTest {
     Mockito.when(merger.merge(Mockito.any(), Mockito.any())).thenReturn(12d);
     Mockito.when(merger.initialize(Mockito.any())).thenReturn(1d);
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, ImmutableMap.of(
-        "SUM", cdt -> merger
-    ));
+    AggregateOperator operator =
+        new AggregateOperator(_input, outSchema, calls, group, inSchema, 
ImmutableMap.of("SUM", cdt -> merger), 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock(); // (output result)
@@ -220,7 +215,7 @@ public class AggregateOperatorTest {
     DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
     AggregateOperator sum0GroupBy1 =
         new AggregateOperator(upstreamOperator, 
OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1),
-            Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), 
inSchema);
+            Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), 
inSchema, 1, 2);
     TransferableBlock result = sum0GroupBy1.getNextBlock();
     while (result.isNoOpBlock()) {
       result = sum0GroupBy1.getNextBlock();
@@ -232,20 +227,18 @@ public class AggregateOperatorTest {
     Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
   }
 
-  @Test(
-      expectedExceptions = IllegalStateException.class,
-      expectedExceptionsMessageRegExp = ".*Unexpected value: AVERAGE.*")
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*Unexpected value: "
+      + "AVERAGE.*")
   public void shouldThrowOnUnknownAggFunction() {
     // Given:
     List<RexExpression> calls = ImmutableList.of(
-        new RexExpression.FunctionCall(SqlKind.AVG, FieldSpec.DataType.INT, 
"AVERAGE", ImmutableList.of())
-    );
+        new RexExpression.FunctionCall(SqlKind.AVG, FieldSpec.DataType.INT, 
"AVERAGE", ImmutableList.of()));
     List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
     DataSchema outSchema = new DataSchema(new String[]{"unknown"}, new 
ColumnDataType[]{DOUBLE});
     DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new 
ColumnDataType[]{DOUBLE});
 
     // When:
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, 1, 2);
   }
 
   @Test
@@ -262,7 +255,7 @@ public class AggregateOperatorTest {
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
-    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema);
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group, inSchema, 1, 2);
 
     // When:
     TransferableBlock block = operator.nextBlock();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index c53ea2037c..cf5cb80151 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -61,7 +61,7 @@ public class FilterOperatorTest {
     DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new 
DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.BOOLEAN
     });
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral, 1, 2);
     TransferableBlock errorBlock = op.getNextBlock();
     Assert.assertTrue(errorBlock.isErrorBlock());
     DataBlock error = errorBlock.getDataBlock();
@@ -76,7 +76,7 @@ public class FilterOperatorTest {
         DataSchema.ColumnDataType.INT
     });
     
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertTrue(dataBlock.isEndOfStreamBlock());
   }
@@ -89,7 +89,7 @@ public class FilterOperatorTest {
         DataSchema.ColumnDataType.INT
     });
     
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertTrue(dataBlock.isNoOpBlock());
   }
@@ -104,7 +104,7 @@ public class FilterOperatorTest {
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new 
Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -122,7 +122,7 @@ public class FilterOperatorTest {
     });
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new 
Object[]{2}));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -137,7 +137,7 @@ public class FilterOperatorTest {
     });
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new 
Object[]{2}));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral, 1, 2);
     TransferableBlock errorBlock = op.getNextBlock();
     Assert.assertTrue(errorBlock.isErrorBlock());
     DataBlock data = errorBlock.getDataBlock();
@@ -152,7 +152,7 @@ public class FilterOperatorTest {
     });
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new 
Object[]{2}));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
ref0);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
ref0, 1, 2);
     TransferableBlock errorBlock = op.getNextBlock();
     Assert.assertTrue(errorBlock.isErrorBlock());
     DataBlock data = errorBlock.getDataBlock();
@@ -167,7 +167,7 @@ public class FilterOperatorTest {
     });
     Mockito.when(_upstreamOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, 
new Object[]{2, false}));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
ref1);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
ref1, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -187,7 +187,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall andCall = new 
RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND",
         ImmutableList.of(new RexExpression.InputRef(0), new 
RexExpression.InputRef(1)));
 
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
andCall);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
andCall, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -207,7 +207,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall orCall = new 
RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR",
         ImmutableList.of(new RexExpression.InputRef(0), new 
RexExpression.InputRef(1)));
 
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
orCall);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
orCall, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -229,7 +229,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall notCall = new 
RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT",
         ImmutableList.of(new RexExpression.InputRef(0)));
 
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
notCall);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
notCall, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -248,7 +248,7 @@ public class FilterOperatorTest {
     RexExpression.FunctionCall greaterThan =
         new RexExpression.FunctionCall(SqlKind.GREATER_THAN, 
FieldSpec.DataType.BOOLEAN, "greaterThan",
             ImmutableList.of(new RexExpression.InputRef(0), new 
RexExpression.InputRef(1)));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
greaterThan);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
greaterThan, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -268,7 +268,7 @@ public class FilterOperatorTest {
         new RexExpression.FunctionCall(SqlKind.OTHER, 
FieldSpec.DataType.BOOLEAN, "startsWith",
             ImmutableList.of(new RexExpression.InputRef(0),
                 new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
startsWith);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
startsWith, 1, 2);
     TransferableBlock dataBlock = op.getNextBlock();
     Assert.assertFalse(dataBlock.isErrorBlock());
     List<Object[]> result = dataBlock.getContainer();
@@ -289,6 +289,6 @@ public class FilterOperatorTest {
         new RexExpression.FunctionCall(SqlKind.OTHER, 
FieldSpec.DataType.BOOLEAN, "startsWithError",
             ImmutableList.of(new RexExpression.InputRef(0),
                 new RexExpression.Literal(FieldSpec.DataType.STRING, "star")));
-    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
startsWith);
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
startsWith, 1, 2);
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index b9ea6b2078..4075237249 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -90,7 +90,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
-    HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
 
     TransferableBlock result = joinOnString.nextBlock();
     while (result.isNoOpBlock()) {
@@ -127,7 +127,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
     TransferableBlock result = joinOnInt.nextBlock();
     while (result.isNoOpBlock()) {
       result = joinOnInt.nextBlock();
@@ -161,7 +161,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(new ArrayList<>(), new ArrayList<>()),
         joinClauses);
-    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
     TransferableBlock result = joinOnInt.nextBlock();
     while (result.isNoOpBlock()) {
       result = joinOnInt.nextBlock();
@@ -202,7 +202,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.LEFT, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -236,7 +236,7 @@ public class HashJoinOperatorTest {
     List<RexExpression> joinClauses = new ArrayList<>();
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -267,7 +267,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.LEFT, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -301,7 +301,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -339,7 +339,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(new ArrayList<>(), new ArrayList<>()),
         joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -377,7 +377,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(new ArrayList<>(), new ArrayList<>()),
         joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -411,7 +411,7 @@ public class HashJoinOperatorTest {
     });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.RIGHT, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
     TransferableBlock result = joinOnNum.nextBlock();
     while (result.isNoOpBlock()) {
       result = joinOnNum.nextBlock();
@@ -438,8 +438,7 @@ public class HashJoinOperatorTest {
     Assert.assertTrue(result.isSuccessfulEndOfStreamBlock());
   }
 
-  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*SEMI is not "
-      + "supported.*")
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*SEMI is not supported.*")
   public void shouldThrowOnSemiJoin() {
     DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
@@ -461,7 +460,7 @@ public class HashJoinOperatorTest {
     });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.SEMI, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
   }
 
   @Test
@@ -485,7 +484,7 @@ public class HashJoinOperatorTest {
     });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.FULL, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -537,7 +536,7 @@ public class HashJoinOperatorTest {
     });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.ANTI, 
getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
   }
 
   @Test
@@ -562,7 +561,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -595,7 +594,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -631,7 +630,7 @@ public class HashJoinOperatorTest {
         });
     JoinNode node =
         new JoinNode(1, resultSchema, JoinRelType.INNER, 
getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, 
_rightOperator, leftSchema, node, 1, 2);
 
     TransferableBlock result = join.nextBlock(); // first no-op consumes first 
right data block.
     Assert.assertTrue(result.isNoOpBlock());
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
index 45a04eb2ff..185a6d5d53 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
@@ -52,7 +52,8 @@ public class LeafStageTransferableBlockOperatorTest {
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT});
     List<InstanceResponseBlock> resultsBlockList = 
Collections.singletonList(new InstanceResponseBlock(
         new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 
1}, new Object[]{"", 2})), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -66,18 +67,19 @@ public class LeafStageTransferableBlockOperatorTest {
   @Test
   public void shouldHandleDesiredDataSchemaConversionCorrectly() {
     // Given:
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
-        "SELECT boolCol, tsCol, boolCol AS newNamedBoolCol FROM tbl");
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT boolCol, tsCol, 
boolCol AS newNamedBoolCol FROM tbl");
     DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "tsCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, 
DataSchema.ColumnDataType.TIMESTAMP});
-    DataSchema desiredSchema = new DataSchema(new String[]{"boolCol", "tsCol", 
"newNamedBoolCol"},
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, 
DataSchema.ColumnDataType.TIMESTAMP,
-            DataSchema.ColumnDataType.BOOLEAN});
+    DataSchema desiredSchema =
+        new DataSchema(new String[]{"boolCol", "tsCol", "newNamedBoolCol"}, 
new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.BOOLEAN, 
DataSchema.ColumnDataType.TIMESTAMP, DataSchema.ColumnDataType.BOOLEAN
+        });
     List<InstanceResponseBlock> resultsBlockList = 
Collections.singletonList(new InstanceResponseBlock(
-        new SelectionResultsBlock(resultSchema, Arrays.asList(new Object[]{1, 
1660000000000L},
-            new Object[]{0, 1600000000000L})), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList,
-        desiredSchema);
+        new SelectionResultsBlock(resultSchema,
+            Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 
1600000000000L})), queryContext));
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, 
desiredSchema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -96,9 +98,10 @@ public class LeafStageTransferableBlockOperatorTest {
     DataSchema schema = new DataSchema(new String[]{"boolCol", "tsCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, 
DataSchema.ColumnDataType.TIMESTAMP});
     List<InstanceResponseBlock> resultsBlockList = 
Collections.singletonList(new InstanceResponseBlock(
-        new SelectionResultsBlock(schema, Arrays.asList(new Object[]{1, 
1660000000000L},
-            new Object[]{0, 1600000000000L})), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+        new SelectionResultsBlock(schema,
+            Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 
1600000000000L})), queryContext));
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -115,13 +118,15 @@ public class LeafStageTransferableBlockOperatorTest {
     QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM tbl");
     DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT});
-    List<InstanceResponseBlock> resultsBlockList = Arrays.asList(
-        new InstanceResponseBlock(new SelectionResultsBlock(schema,
-            Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), 
queryContext),
-        new InstanceResponseBlock(new SelectionResultsBlock(schema,
-            Arrays.asList(new Object[]{"bar", 3}, new Object[]{"foo", 4})), 
queryContext),
+    List<InstanceResponseBlock> resultsBlockList = Arrays.asList(new 
InstanceResponseBlock(
+            new SelectionResultsBlock(schema, Arrays.asList(new 
Object[]{"foo", 1}, new Object[]{"", 2})),
+            queryContext),
+        new InstanceResponseBlock(
+            new SelectionResultsBlock(schema, Arrays.asList(new 
Object[]{"bar", 3}, new Object[]{"foo", 4})),
+            queryContext),
         new InstanceResponseBlock(new SelectionResultsBlock(schema, 
Collections.emptyList()), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock1 = operator.nextBlock();
@@ -145,12 +150,13 @@ public class LeafStageTransferableBlockOperatorTest {
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT});
     InstanceResponseBlock errorBlock = new InstanceResponseBlock();
     
errorBlock.addException(QueryException.QUERY_EXECUTION_ERROR.getErrorCode(), 
"foobar");
-    List<InstanceResponseBlock> resultsBlockList = Arrays.asList(
-        new InstanceResponseBlock(new SelectionResultsBlock(schema,
-            Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), 
queryContext),
+    List<InstanceResponseBlock> resultsBlockList = Arrays.asList(new 
InstanceResponseBlock(
+            new SelectionResultsBlock(schema, Arrays.asList(new 
Object[]{"foo", 1}, new Object[]{"", 2})),
+            queryContext),
         errorBlock,
         new InstanceResponseBlock(new SelectionResultsBlock(schema, 
Collections.emptyList()), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -162,16 +168,16 @@ public class LeafStageTransferableBlockOperatorTest {
   @Test
   public void shouldReorderWhenQueryContextAskForNotInOrderGroupByAsDistinct() 
{
     // Given:
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
-        "SELECT intCol, strCol FROM tbl GROUP BY strCol, intCol");
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT intCol, strCol FROM 
tbl GROUP BY strCol, intCol");
     // result schema doesn't match with DISTINCT columns using GROUP BY.
     DataSchema schema = new DataSchema(new String[]{"intCol", "strCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING});
-    List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
-        new InstanceResponseBlock(new 
DistinctResultsBlock(mock(DistinctAggregationFunction.class),
-            new DistinctTable(schema, Arrays.asList(
-                new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, 
"bar"})))), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+    List<InstanceResponseBlock> resultsBlockList = 
Collections.singletonList(new InstanceResponseBlock(
+        new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new 
DistinctTable(schema,
+            Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new 
Object[]{2, "bar"})))), queryContext));
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -184,16 +190,15 @@ public class LeafStageTransferableBlockOperatorTest {
   @Test
   public void shouldParsedBlocksSuccessfullyWithDistinctQuery() {
     // Given:
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
-        "SELECT DISTINCT strCol, intCol FROM tbl");
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT DISTINCT strCol, intCol FROM 
tbl");
     // result schema doesn't match with DISTINCT columns using GROUP BY.
     DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT});
-    List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
-        new InstanceResponseBlock(new 
DistinctResultsBlock(mock(DistinctAggregationFunction.class),
-            new DistinctTable(schema, Arrays.asList(
-                new Record(new Object[]{"foo", 1}), new Record(new 
Object[]{"bar", 2})))), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+    List<InstanceResponseBlock> resultsBlockList = 
Collections.singletonList(new InstanceResponseBlock(
+        new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new 
DistinctTable(schema,
+            Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new 
Object[]{"bar", 2})))), queryContext));
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -209,12 +214,15 @@ public class LeafStageTransferableBlockOperatorTest {
     QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
         "SELECT intCol, count(*), sum(doubleCol), strCol FROM tbl GROUP BY 
strCol, intCol");
     // result schema doesn't match with columns ordering using GROUP BY, this 
should not occur.
-    DataSchema schema = new DataSchema(new String[]{"intCol", "count(*)", 
"sum(doubleCol)", "strCol"},
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT,
-            DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.STRING});
+    DataSchema schema =
+        new DataSchema(new String[]{"intCol", "count(*)", "sum(doubleCol)", 
"strCol"}, new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.LONG,
+            DataSchema.ColumnDataType.STRING
+        });
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(schema, 
Collections.emptyList()), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -230,11 +238,14 @@ public class LeafStageTransferableBlockOperatorTest {
         + "sum(doubleCol) FROM tbl GROUP BY strCol, intCol HAVING 
sum(doubleCol) < 10 AND count(*) > 0");
     // result schema contains duplicate reference from agg and having. it will 
repeat itself.
     DataSchema schema = new DataSchema(new String[]{"strCol", "intCol", 
"count(*)", "sum(doubleCol)", "sum(doubleCol)"},
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
-            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG, 
DataSchema.ColumnDataType.LONG});
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG
+        });
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(schema, 
Collections.emptyList()), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -246,15 +257,14 @@ public class LeafStageTransferableBlockOperatorTest {
   @Test
   public void shouldNotErrorOutWhenDealingWithAggregationResults() {
     // Given:
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
-        "SELECT count(*), sum(doubleCol) FROM tbl");
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT count(*), sum(doubleCol) 
FROM tbl");
     // result schema doesn't match with DISTINCT columns using GROUP BY.
     DataSchema schema = new DataSchema(new String[]{"count_star", 
"sum(doubleCol)"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.LONG});
-    List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
-        new InstanceResponseBlock(new 
AggregationResultsBlock(queryContext.getAggregationFunctions(),
-            Collections.emptyList()), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(resultsBlockList, schema);
+    List<InstanceResponseBlock> resultsBlockList = 
Collections.singletonList(new InstanceResponseBlock(
+        new AggregationResultsBlock(queryContext.getAggregationFunctions(), 
Collections.emptyList()), queryContext));
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2);
 
     // When:
     TransferableBlock resultBlock = operator.nextBlock();
@@ -275,8 +285,8 @@ public class LeafStageTransferableBlockOperatorTest {
     // When:
     List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
         new InstanceResponseBlock(new SelectionResultsBlock(resultSchema, 
Collections.emptyList()), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(responseBlockList,
-        desiredSchema);
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(responseBlockList, 
desiredSchema, 1, 2);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
@@ -287,19 +297,19 @@ public class LeafStageTransferableBlockOperatorTest {
   @Test
   public void 
shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsDistinct() {
     // Given:
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
-        "SELECT strCol, intCol FROM tbl GROUP BY strCol, intCol");
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM 
tbl GROUP BY strCol, intCol");
     DataSchema resultSchema = new DataSchema(new String[]{"strCol", "intCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.STRING});
     DataSchema desiredSchema = new DataSchema(new String[]{"strCol", "intCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT});
 
     // When:
-    List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
-        new InstanceResponseBlock(new 
DistinctResultsBlock(mock(DistinctAggregationFunction.class),
+    List<InstanceResponseBlock> responseBlockList = 
Collections.singletonList(new InstanceResponseBlock(
+        new DistinctResultsBlock(mock(DistinctAggregationFunction.class),
             new DistinctTable(resultSchema, Collections.emptyList())), 
queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(responseBlockList,
-        desiredSchema);
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(responseBlockList, 
desiredSchema, 1, 2);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
@@ -310,18 +320,18 @@ public class LeafStageTransferableBlockOperatorTest {
   @Test
   public void 
shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsGroupBy() {
     // Given:
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
-        "SELECT strCol, SUM(intCol) FROM tbl GROUP BY strCol");
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContext("SELECT strCol, SUM(intCol) 
FROM tbl GROUP BY strCol");
     DataSchema resultSchema = new DataSchema(new String[]{"strCol", 
"SUM(intCol)"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.STRING});
     DataSchema desiredSchema = new DataSchema(new String[]{"strCol", 
"SUM(intCol)"},
-      new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT});
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT});
 
     // When:
     List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
         new InstanceResponseBlock(new GroupByResultsBlock(resultSchema, 
Collections.emptyList()), queryContext));
-    LeafStageTransferableBlockOperator operator = new 
LeafStageTransferableBlockOperator(responseBlockList,
-        desiredSchema);
+    LeafStageTransferableBlockOperator operator =
+        new LeafStageTransferableBlockOperator(responseBlockList, 
desiredSchema, 1, 2);
     TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
index 663caf2272..856965cfb4 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
@@ -24,13 +24,34 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
 public class LiteralValueOperatorTest {
 
+  private AutoCloseable _mocks;
+
+  @Mock
+  private PlanRequestContext _context;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
   @Test
   public void shouldReturnLiteralBlock() {
     // Given:
@@ -44,7 +65,7 @@ public class LiteralValueOperatorTest {
             new RexExpression.Literal(DataType.STRING, ""),
             new RexExpression.Literal(DataType.INT, 2))
     );
-    LiteralValueOperator operator = new LiteralValueOperator(schema, literals);
+    LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 
1, 2);
 
     // When:
     TransferableBlock transferableBlock = operator.nextBlock();
@@ -60,7 +81,7 @@ public class LiteralValueOperatorTest {
     // Given:
     DataSchema schema = new DataSchema(new String[]{}, new ColumnDataType[]{});
     List<List<RexExpression>> literals = ImmutableList.of(ImmutableList.of());
-    LiteralValueOperator operator = new LiteralValueOperator(schema, literals);
+    LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 
1, 2);
 
     // When:
     TransferableBlock transferableBlock = operator.nextBlock();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 3dc951dca3..9f6577dc66 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -74,7 +74,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory);
+        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
@@ -91,7 +91,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory);
+        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock errorBlock = 
TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
     Mockito.when(_input.nextBlock())
         .thenReturn(errorBlock);
@@ -109,7 +109,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory);
+        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
     Mockito.when(_input.nextBlock())
         .thenThrow(new RuntimeException("foo!"));
     ArgumentCaptor<TransferableBlock> captor = 
ArgumentCaptor.forClass(TransferableBlock.class);
@@ -128,7 +128,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory);
+        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock eosBlock = 
TransferableBlockUtils.getEndOfStreamTransferableBlock();
     Mockito.when(_input.nextBlock())
         .thenReturn(eosBlock);
@@ -146,7 +146,7 @@ public class MailboxSendOperatorTest {
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(
         _mailboxService, _input, ImmutableList.of(_server), 
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
-        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory);
+        server -> new StringMailboxIdentifier("123:from:1:to:2"), 
_exchangeFactory, 1, 2);
     TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
     Mockito.when(_input.nextBlock())
         .thenReturn(dataBlock)
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index 2cc6b68a67..b11a00245a 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -64,7 +64,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("foo!")));
@@ -82,7 +82,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
@@ -99,7 +99,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
@@ -116,7 +116,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
@@ -139,7 +139,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new 
DataSchema.ColumnDataType[]{INT, INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
@@ -162,7 +162,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{STRING});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
@@ -185,7 +185,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.DESCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
@@ -208,7 +208,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 1, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 1, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new 
Object[]{3}))
@@ -231,7 +231,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 1, 1, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 1, 1, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new 
Object[]{3}))
@@ -253,7 +253,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 2, 0, 
schema, 1);
+    SortOperator op = new SortOperator(_input, collation, directions, 2, 0, 
schema, 1, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new 
Object[]{3}))
@@ -275,7 +275,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, -1, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, -1, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new 
Object[]{3}))
@@ -296,7 +296,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{2}))
@@ -320,7 +320,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0, 1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING, 
Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"first", "second"}, new 
DataSchema.ColumnDataType[]{INT, INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new 
Object[]{1, 3}))
@@ -344,7 +344,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0, 1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING, 
Direction.DESCENDING);
     DataSchema schema = new DataSchema(new String[]{"first", "second"}, new 
DataSchema.ColumnDataType[]{INT, INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new 
Object[]{1, 3}))
@@ -368,7 +368,7 @@ public class SortOperatorTest {
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new 
DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema);
+    SortOperator op = new SortOperator(_input, collation, directions, 10, 0, 
schema, 1, 2);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{2}))
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
index b89313c25c..52ffff08fb 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
@@ -71,7 +71,7 @@ public class TransformOperatorTest {
     RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
     RexExpression.InputRef ref1 = new RexExpression.InputRef(1);
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(ref0, ref1), upStreamSchema);
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(ref0, ref1), upStreamSchema, 1, 2);
     TransferableBlock result = op.nextBlock();
 
     Assert.assertTrue(!result.isErrorBlock());
@@ -95,7 +95,8 @@ public class TransformOperatorTest {
     RexExpression.Literal boolLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
     RexExpression.Literal strLiteral = new 
RexExpression.Literal(FieldSpec.DataType.STRING, "str");
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
+            2);
     TransferableBlock result = op.nextBlock();
     // Literal operands should just output original literals.
     Assert.assertTrue(!result.isErrorBlock());
@@ -125,7 +126,7 @@ public class TransformOperatorTest {
     DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE});
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(plus01, minus01), upStreamSchema);
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2);
     TransferableBlock result = op.nextBlock();
     Assert.assertTrue(!result.isErrorBlock());
     List<Object[]> resultRows = result.getContainer();
@@ -153,7 +154,7 @@ public class TransformOperatorTest {
     DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE});
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(plus01, minus01), upStreamSchema);
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2);
 
     TransferableBlock result = op.nextBlock();
     Assert.assertTrue(result.isErrorBlock());
@@ -173,7 +174,8 @@ public class TransformOperatorTest {
     DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING});
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
+            2);
     TransferableBlock result = op.nextBlock();
     Assert.assertTrue(result.isErrorBlock());
     DataBlock data = result.getDataBlock();
@@ -196,7 +198,8 @@ public class TransformOperatorTest {
     DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "strCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, 
DataSchema.ColumnDataType.STRING});
     TransformOperator op =
-        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1,
+            2);
     TransferableBlock result = op.nextBlock();
     // First block has two rows
     Assert.assertFalse(result.isErrorBlock());
@@ -227,7 +230,8 @@ public class TransformOperatorTest {
     DataSchema upStreamSchema = new DataSchema(new String[]{"string1", 
"string2"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING
     });
-    TransformOperator transform = new TransformOperator(_upstreamOp, 
resultSchema, new ArrayList<>(), upStreamSchema);
+    TransformOperator transform =
+        new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), 
upStreamSchema, 1, 2);
   }
 
   @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*doesn't match "
@@ -240,6 +244,6 @@ public class TransformOperatorTest {
     });
     RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
     TransformOperator transform =
-        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(ref0), upStreamSchema);
+        new TransformOperator(_upstreamOp, resultSchema, 
ImmutableList.of(ref0), upStreamSchema, 1, 2);
   }
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to