This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 58b113225a [multistage][refactor] Introduce MultiStageOperator with 
close() API  (#10123)
58b113225a is described below

commit 58b113225a3e518dd261553274c0e75abb69a29f
Author: Yao Liu <y...@startree.ai>
AuthorDate: Wed Jan 18 09:22:47 2023 -0800

    [multistage][refactor] Introduce MultiStageOperator with close() API  
(#10123)
    
    * v2 operator
    
    * test
    
    * test
    
    * opchain scheduler
    
    * v2 operator
    
    * fix scheduling
    
    * add comment
    
    * address feedback
    
    * address comments
    
    * add todo
    
    * change class name
    
    * change getChildOp API
---
 .../org/apache/pinot/core/common/Operator.java     |  2 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  |  4 +-
 .../query/runtime/executor/OpChainScheduler.java   |  6 +++
 .../runtime/executor/OpChainSchedulerService.java  | 17 +++++--
 .../runtime/executor/RoundRobinScheduler.java      | 24 +++++++++-
 .../query/runtime/operator/AggregateOperator.java  | 16 +++----
 .../query/runtime/operator/FilterOperator.java     | 14 +++---
 .../query/runtime/operator/HashJoinOperator.java   | 19 ++++----
 .../LeafStageTransferableBlockOperator.java        |  9 ++--
 .../runtime/operator/LiteralValueOperator.java     | 10 ++--
 .../runtime/operator/MailboxReceiveOperator.java   |  9 ++--
 .../runtime/operator/MailboxSendOperator.java      | 16 +++----
 .../query/runtime/operator/MultiStageOperator.java | 49 ++++++++++++++++++++
 .../pinot/query/runtime/operator/OpChain.java      | 12 +++--
 .../pinot/query/runtime/operator/SortOperator.java | 16 +++----
 .../query/runtime/operator/TransformOperator.java  | 14 +++---
 .../query/runtime/plan/PhysicalPlanVisitor.java    | 39 ++++++++--------
 .../executor/OpChainSchedulerServiceTest.java      | 54 ++++++++++++++++------
 .../runtime/executor/RoundRobinSchedulerTest.java  |  5 +-
 .../runtime/operator/AggregateOperatorTest.java    |  6 +--
 .../query/runtime/operator/FilterOperatorTest.java |  3 +-
 .../runtime/operator/HashJoinOperatorTest.java     |  5 +-
 .../runtime/operator/MailboxSendOperatorTest.java  |  3 +-
 .../query/runtime/operator/OperatorTestUtil.java   |  3 +-
 .../query/runtime/operator/SortOperatorTest.java   |  3 +-
 .../runtime/operator/TransformOperatorTest.java    |  3 +-
 .../testutils/MockDataBlockOperatorFactory.java    |  6 +--
 27 files changed, 226 insertions(+), 141 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java
index 5926805c6a..f86caf9798 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/Operator.java
@@ -42,7 +42,7 @@ public interface Operator<T extends Block> {
   T nextBlock();
 
   /** @return List of {@link Operator}s that this operator depends upon. */
-  List<Operator> getChildOperators();
+  List<? extends Operator> getChildOperators();
 
   /** @return Explain Plan description if available; otherwise, null. */
   @Nullable
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 92467ba316..40dff35893 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -408,7 +408,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     Map<Integer, HashSet<Integer>> uniquePlanNodeHashCodes = new HashMap<>();
 
     // Obtain the list of all possible segment plans after the combine root 
node
-    List<Operator> children = root.getChildOperators();
+    List<? extends Operator> children = root.getChildOperators();
     for (Operator child : children) {
       int[] operatorId = {3};
       ExplainPlanRows explainPlanRows = new ExplainPlanRows();
@@ -483,7 +483,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
 
   public static InstanceResponseBlock executeExplainQuery(Plan queryPlan, 
QueryContext queryContext) {
     ExplainResultsBlock explainResults = new ExplainResultsBlock();
-    List<Operator> childOperators = 
queryPlan.getPlanNode().run().getChildOperators();
+    List<? extends Operator> childOperators = 
queryPlan.getPlanNode().run().getChildOperators();
     assert childOperators.size() == 1;
     Operator root = childOperators.get(0);
     Map<Integer, List<ExplainPlanRows>> operatorDepthToRowDataMap;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
index 8b35772542..635e5aacfb 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
@@ -44,6 +44,12 @@ public interface OpChainScheduler {
    */
   void onDataAvailable(MailboxIdentifier mailbox);
 
+  /**
+   * This method is called when scheduler is terminating. It should clean up 
all of the resources if there are any.
+   * register() and onDataAvailable() shouldn't be called anymore after 
shutDown is called.
+   */
+  void shutDown();
+
   /**
    * @return whether or not there is any work for the scheduler to do
    */
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index e40b2dbf8f..21a944d4f7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.executor;
 
 import com.google.common.util.concurrent.AbstractExecutionThreadService;
 import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.core.util.trace.TraceRunnable;
@@ -43,6 +44,8 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
 
+  private static final int TERMINATION_TIMEOUT_SEC = 60;
+
   private final OpChainScheduler _scheduler;
   private final ExecutorService _workerPool;
   private final long _pollIntervalMs;
@@ -56,6 +59,7 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
     }
   };
 
+  // Note that workerPool is shut down in this class.
   public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
     this(scheduler, workerPool, -1);
   }
@@ -72,6 +76,10 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
     // this will just notify all waiters that the scheduler is shutting down
     _monitor.enter();
     _monitor.leave();
+    if (!MoreExecutors.shutdownAndAwaitTermination(_workerPool, 
TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS)) {
+      LOGGER.error("Failed to shut down and terminate OpChainScheduler.");
+    }
+    _scheduler.shutDown();
   }
 
   @Override
@@ -111,8 +119,10 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
                   } else {
                     LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
                   }
+                  operatorChain.close();
                 }
               } catch (Exception e) {
+                operatorChain.close();
                 LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
               }
             }
@@ -142,9 +152,7 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
   public final void register(OpChain operatorChain) {
     register(operatorChain, true);
     LOGGER.debug("({}): Scheduler is now handling operator chain listening to 
mailboxes {}. "
-            + "There are a total of {} chains awaiting execution.",
-        operatorChain,
-        operatorChain.getReceivingMailbox(),
+            + "There are a total of {} chains awaiting execution.", 
operatorChain, operatorChain.getReceivingMailbox(),
         _scheduler.size());
 
     // we want to track the time that it takes from registering
@@ -156,8 +164,7 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
   public final void register(OpChain operatorChain, boolean isNew) {
     _monitor.enter();
     try {
-      LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}",
-          operatorChain, isNew, _scheduler.size());
+      LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}", 
operatorChain, isNew, _scheduler.size());
 
       _scheduler.register(operatorChain, isNew);
       operatorChain.getStats().queued();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index 3a20bf5d6a..c7b77f85e7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -27,6 +27,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.slf4j.Logger;
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
  * of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
  * callback.
  */
+@NotThreadSafe
 public class RoundRobinScheduler implements OpChainScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RoundRobinScheduler.class);
   private static final long DEFAULT_RELEASE_TIMEOUT = 
TimeUnit.MINUTES.toMillis(1);
@@ -53,6 +55,8 @@ public class RoundRobinScheduler implements OpChainScheduler {
   private final Queue<AvailableEntry> _available = new LinkedList<>();
   private final Queue<OpChain> _ready = new LinkedList<>();
 
+  private boolean _isShutDown = false;
+
   // using a Set here is acceptable because calling hasNext() and
   // onDataAvailable() cannot occur concurrently - that means that
   // anytime we schedule a new operator based on the presence of
@@ -79,6 +83,9 @@ public class RoundRobinScheduler implements OpChainScheduler {
 
   @Override
   public void register(OpChain operatorChain, boolean isNew) {
+    if (_isShutDown) {
+      return;
+    }
     // the first time an operator chain is scheduled, it should
     // immediately be considered ready in case it does not need
     // read from any mailbox (e.g. with a LiteralValueOperator)
@@ -134,6 +141,20 @@ public class RoundRobinScheduler implements 
OpChainScheduler {
     return _ready.size() + _available.size();
   }
 
+  @Override
+  public void shutDown() {
+    if (_isShutDown) {
+      return;
+    }
+    while (!_ready.isEmpty()) {
+      _ready.poll().close();
+    }
+    while (!_available.isEmpty()) {
+      _available.poll()._opChain.close();
+    }
+    _isShutDown = true;
+  }
+
   private void computeReady() {
     Iterator<AvailableEntry> availableChains = _available.iterator();
 
@@ -164,8 +185,7 @@ public class RoundRobinScheduler implements 
OpChainScheduler {
   }
 
   private void trace(String operation) {
-    LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}",
-        operation, _ready, _available, _seenMail);
+    LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}", operation, _ready, 
_available, _seenMail);
   }
 
   private static class AvailableEntry {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index fe61ce244e..a82048949f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -30,9 +31,7 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.Key;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -53,10 +52,10 @@ import org.apache.pinot.spi.data.FieldSpec;
  * Note: This class performs aggregation over the double value of input.
  * If the input is single value, the output type will be input type. 
Otherwise, the output type will be double.
  */
-public class AggregateOperator extends BaseOperator<TransferableBlock> {
+public class AggregateOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
 
-  private final Operator<TransferableBlock> _inputOperator;
+  private final MultiStageOperator _inputOperator;
   // TODO: Deal with the case where _aggCalls is empty but we have groupSet 
setup, which means this is a Distinct call.
   private final List<RexExpression.FunctionCall> _aggCalls;
   private final List<RexExpression> _groupSet;
@@ -73,13 +72,13 @@ public class AggregateOperator extends 
BaseOperator<TransferableBlock> {
   // aggCalls has to be a list of FunctionCall and cannot be null
   // groupSet has to be a list of InputRef and cannot be null
   // TODO: Add these two checks when we confirm we can handle error in 
upstream ctor call.
-  public AggregateOperator(Operator<TransferableBlock> inputOperator, 
DataSchema dataSchema,
+  public AggregateOperator(MultiStageOperator inputOperator, DataSchema 
dataSchema,
       List<RexExpression> aggCalls, List<RexExpression> groupSet) {
     this(inputOperator, dataSchema, aggCalls, groupSet, 
AggregateOperator.Accumulator.MERGERS);
   }
 
   @VisibleForTesting
-  AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema 
dataSchema,
+  AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema,
       List<RexExpression> aggCalls, List<RexExpression> groupSet, Map<String, 
Merger> mergers) {
     _inputOperator = inputOperator;
     _groupSet = groupSet;
@@ -107,9 +106,8 @@ public class AggregateOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   @Override
-  public List<Operator> getChildOperators() {
-    // WorkerExecutor doesn't use getChildOperators, returns null here.
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_inputOperator);
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index 32fe4508cf..3ae9eac98f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -18,13 +18,12 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -45,14 +44,14 @@ import 
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
     3) All boolean scalar functions we have that take tranformOperand.
     Note: Scalar functions are the ones we have in v1 engine and only do 
function name and arg # matching.
  */
-public class FilterOperator extends BaseOperator<TransferableBlock> {
+public class FilterOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "FILTER";
-  private final Operator<TransferableBlock> _upstreamOperator;
+  private final MultiStageOperator _upstreamOperator;
   private final TransformOperand _filterOperand;
   private final DataSchema _dataSchema;
   private TransferableBlock _upstreamErrorBlock;
 
-  public FilterOperator(Operator<TransferableBlock> upstreamOperator, 
DataSchema dataSchema, RexExpression filter) {
+  public FilterOperator(MultiStageOperator upstreamOperator, DataSchema 
dataSchema, RexExpression filter) {
     _upstreamOperator = upstreamOperator;
     _dataSchema = dataSchema;
     _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
@@ -60,9 +59,8 @@ public class FilterOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   @Override
-  public List<Operator> getChildOperators() {
-    // WorkerExecutor doesn't use getChildOperators, returns null here.
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_upstreamOperator);
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 7fe3902f61..be47c9f389 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -1,5 +1,3 @@
-
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,6 +19,7 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -32,9 +31,7 @@ import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.table.Key;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
@@ -56,7 +53,7 @@ import 
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
  * The output is in the format of [left_row, right_row]
  */
 // TODO: Move inequi out of hashjoin. 
(https://github.com/apache/pinot/issues/9728)
-public class HashJoinOperator extends BaseOperator<TransferableBlock> {
+public class HashJoinOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "HASH_JOIN";
   private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES =
       ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, 
JoinRelType.FULL);
@@ -68,8 +65,8 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   // TODO: Replace hashset with rolling bit map.
   private final HashMap<Key, HashSet<Integer>> _matchedRightRows;
 
-  private final Operator<TransferableBlock> _leftTableOperator;
-  private final Operator<TransferableBlock> _rightTableOperator;
+  private final MultiStageOperator _leftTableOperator;
+  private final MultiStageOperator _rightTableOperator;
   private final JoinRelType _joinType;
   private final DataSchema _resultSchema;
   private final int _leftRowSize;
@@ -85,7 +82,7 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   private KeySelector<Object[], Object[]> _leftKeySelector;
   private KeySelector<Object[], Object[]> _rightKeySelector;
 
-  public HashJoinOperator(Operator<TransferableBlock> leftTableOperator, 
Operator<TransferableBlock> rightTableOperator,
+  public HashJoinOperator(MultiStageOperator leftTableOperator, 
MultiStageOperator rightTableOperator,
       DataSchema leftSchema, JoinNode node) {
     
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
         "Join type: " + node.getJoinRelType() + " is not supported!");
@@ -116,10 +113,10 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
     _upstreamErrorBlock = null;
   }
 
+  // TODO: Separate left and right table operator.
   @Override
-  public List<Operator> getChildOperators() {
-    // WorkerExecutor doesn't use getChildOperators, returns null here.
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_leftTableOperator, _rightTableOperator);
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 0f93cfece4..365d04f863 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,8 +32,6 @@ import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
@@ -56,7 +55,7 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlock;
  *       thus requires canonicalization.</li>
  * </ul>
  */
-public class LeafStageTransferableBlockOperator extends 
BaseOperator<TransferableBlock> {
+public class LeafStageTransferableBlockOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
 
   private final InstanceResponseBlock _errorBlock;
@@ -72,8 +71,8 @@ public class LeafStageTransferableBlockOperator extends 
BaseOperator<Transferabl
   }
 
   @Override
-  public List<Operator> getChildOperators() {
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of();
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
index 9141abad9a..cace9fa974 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -18,19 +18,18 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 
 
-public class LiteralValueOperator extends BaseOperator<TransferableBlock> {
+public class LiteralValueOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "LITERAL_VALUE_PROVIDER";
 
   private final DataSchema _dataSchema;
@@ -44,9 +43,8 @@ public class LiteralValueOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   @Override
-  public List<Operator> getChildOperators() {
-    // WorkerExecutor doesn't use getChildOperators, returns null here.
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of();
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index c5b618a533..ee97a99e79 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -27,7 +28,6 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
  *  When exchangeType is Singleton, we find the mapping mailbox for the 
mailboxService. If not found, use empty list.
  *  When exchangeType is non-Singleton, we pull from each instance in 
round-robin way to get matched mailbox content.
  */
-public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
+public class MailboxReceiveOperator extends MultiStageOperator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxReceiveOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
 
@@ -116,9 +116,8 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   @Override
-  public List<Operator> getChildOperators() {
-    // WorkerExecutor doesn't use getChildOperators, returns null here.
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of();
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 3c95eb0e3d..a6299ea60a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.util.Collections;
 import java.util.List;
@@ -27,8 +28,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -45,7 +44,7 @@ import org.slf4j.LoggerFactory;
 /**
  * This {@code MailboxSendOperator} is created to send {@link 
TransferableBlock}s to the receiving end.
  */
-public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
+public class MailboxSendOperator extends MultiStageOperator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxSendOperator.class);
 
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
@@ -53,7 +52,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
       ImmutableSet.of(RelDistribution.Type.SINGLETON, 
RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, 
RelDistribution.Type.HASH_DISTRIBUTED);
 
-  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final MultiStageOperator _dataTableBlockBaseOperator;
   private final BlockExchange _exchange;
 
   @VisibleForTesting
@@ -68,7 +67,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
-      Operator<TransferableBlock> dataTableBlockBaseOperator, 
List<ServerInstance> receivingStageInstances,
+      MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance> 
receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector, String hostName, int port,
       long jobId, int stageId) {
     this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, 
exchangeType, keySelector,
@@ -77,7 +76,7 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
 
   @VisibleForTesting
   MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
-      Operator<TransferableBlock> dataTableBlockBaseOperator, 
List<ServerInstance> receivingStageInstances,
+      MultiStageOperator dataTableBlockBaseOperator, List<ServerInstance> 
receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector,
       MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory 
blockExchangeFactory) {
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -110,9 +109,8 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   @Override
-  public List<Operator> getChildOperators() {
-    // WorkerExecutor doesn't use getChildOperators, returns null here.
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of();
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
new file mode 100644
index 0000000000..9c82d1cb2a
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.List;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class MultiStageOperator extends 
BaseOperator<TransferableBlock> implements AutoCloseable {
+  private static final org.slf4j.Logger LOGGER = 
LoggerFactory.getLogger(MultiStageOperator.class);
+
+  // TODO: use the API public List<? extends Operator> getChildOperators() to 
merge two APIs.
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    throw new UnsupportedOperationException();
+  }
+
+  // TODO: Ideally close() call should finish within request deadline.
+  // TODO: Consider passing deadline as part of the API.
+  @Override
+  public void close() {
+    for (MultiStageOperator op : getChildOperators()) {
+      try {
+        op.close();
+      } catch (Exception e) {
+        LOGGER.error("Failed to close operator:" + op);
+        // Continue processing because even one operator failed to be close, 
we should still close the rest.
+      }
+    }
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index 8ffeec7e22..424d7d003f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -30,15 +30,14 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlock;
  * An {@code OpChain} represents a chain of operators that are separated
  * by send/receive stages.
  */
-public class OpChain {
+public class OpChain implements AutoCloseable {
 
-  private final Operator<TransferableBlock> _root;
+  private final MultiStageOperator _root;
   private final Set<MailboxIdentifier> _receivingMailbox;
   private final OpChainStats _stats;
   private final String _id;
 
-  public OpChain(Operator<TransferableBlock> root, List<MailboxIdentifier> 
receivingMailboxes, long requestId,
-      int stageId) {
+  public OpChain(MultiStageOperator root, List<MailboxIdentifier> 
receivingMailboxes, long requestId, int stageId) {
     _root = root;
     _receivingMailbox = new HashSet<>(receivingMailboxes);
     _id = String.format("%s_%s", requestId, stageId);
@@ -61,4 +60,9 @@ public class OpChain {
   public String toString() {
     return "OpChain{" + _id + "}";
   }
+
+  @Override
+  public void close() {
+    _root.close();
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 87c14d3be0..4ba3dbde94 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
@@ -27,17 +28,15 @@ import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 
 
-public class SortOperator extends BaseOperator<TransferableBlock> {
+public class SortOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "SORT";
-  private final Operator<TransferableBlock> _upstreamOperator;
+  private final MultiStageOperator _upstreamOperator;
   private final int _fetch;
   private final int _offset;
   private final DataSchema _dataSchema;
@@ -48,14 +47,14 @@ public class SortOperator extends 
BaseOperator<TransferableBlock> {
   private boolean _isSortedBlockConstructed;
   private TransferableBlock _upstreamErrorBlock;
 
-  public SortOperator(Operator<TransferableBlock> upstreamOperator, 
List<RexExpression> collationKeys,
+  public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> 
collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int 
offset, DataSchema dataSchema) {
     this(upstreamOperator, collationKeys, collationDirections, fetch, offset, 
dataSchema,
         SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
   }
 
   @VisibleForTesting
-  SortOperator(Operator<TransferableBlock> upstreamOperator, 
List<RexExpression> collationKeys,
+  SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> 
collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int 
offset, DataSchema dataSchema,
       int maxHolderCapacity) {
     _upstreamOperator = upstreamOperator;
@@ -72,9 +71,8 @@ public class SortOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   @Override
-  public List<Operator> getChildOperators() {
-    // WorkerExecutor doesn't use getChildOperators, returns null here.
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_upstreamOperator);
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 80af357cec..83db10d4a2 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -19,13 +19,12 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -43,16 +42,16 @@ import 
org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
  * Note: Function transform only runs functions from v1 engine scalar function 
factory, which only does argument count
  * and canonicalized function name matching (lower case).
  */
-public class TransformOperator extends BaseOperator<TransferableBlock> {
+public class TransformOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "TRANSFORM";
-  private final Operator<TransferableBlock> _upstreamOperator;
+  private final MultiStageOperator _upstreamOperator;
   private final List<TransformOperand> _transformOperandsList;
   private final int _resultColumnSize;
   // TODO: Check type matching between resultSchema and the actual result.
   private final DataSchema _resultSchema;
   private TransferableBlock _upstreamErrorBlock;
 
-  public TransformOperator(Operator<TransferableBlock> upstreamOperator, 
DataSchema resultSchema,
+  public TransformOperator(MultiStageOperator upstreamOperator, DataSchema 
resultSchema,
       List<RexExpression> transforms, DataSchema upstreamDataSchema) {
     Preconditions.checkState(!transforms.isEmpty(), "transform operand should 
not be empty.");
     Preconditions.checkState(resultSchema.size() == transforms.size(),
@@ -67,9 +66,8 @@ public class TransformOperator extends 
BaseOperator<TransferableBlock> {
   }
 
   @Override
-  public List<Operator> getChildOperators() {
-    // WorkerExecutor doesn't use getChildOperators, returns null here.
-    return null;
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of(_upstreamOperator);
   }
 
   @Nullable
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 2ba2cc7afc..57a0949bc6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.runtime.plan;
 
 import java.util.List;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.AggregateNode;
@@ -33,13 +32,13 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.StageNodeVisitor;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.SortOperator;
 import org.apache.pinot.query.runtime.operator.TransformOperator;
@@ -52,17 +51,17 @@ import 
org.apache.pinot.query.runtime.operator.TransformOperator;
  *
  * <p>This class should be used statically via {@link #build(StageNode, 
PlanRequestContext)}
  */
-public class PhysicalPlanVisitor implements 
StageNodeVisitor<Operator<TransferableBlock>, PlanRequestContext> {
+public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator, PlanRequestContext> {
 
   private static final PhysicalPlanVisitor INSTANCE = new 
PhysicalPlanVisitor();
 
   public static OpChain build(StageNode node, PlanRequestContext context) {
-    Operator<TransferableBlock> root = node.visit(INSTANCE, context);
+    MultiStageOperator root = node.visit(INSTANCE, context);
     return new OpChain(root, context.getReceivingMailboxes(), 
context.getRequestId(), context.getStageId());
   }
 
   @Override
-  public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode 
node, PlanRequestContext context) {
+  public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, 
PlanRequestContext context) {
     List<ServerInstance> sendingInstances = 
context.getMetadataMap().get(node.getSenderStageId()).getServerInstances();
     MailboxReceiveOperator mailboxReceiveOperator =
         new MailboxReceiveOperator(context.getMailboxService(), 
sendingInstances,
@@ -73,8 +72,8 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<Operator<Transferab
   }
 
   @Override
-  public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node, 
PlanRequestContext context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
+  public MultiStageOperator visitMailboxSend(MailboxSendNode node, 
PlanRequestContext context) {
+    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     StageMetadata receivingStageMetadata = 
context.getMetadataMap().get(node.getReceiverStageId());
     return new MailboxSendOperator(context.getMailboxService(), nextOperator,
         receivingStageMetadata.getServerInstances(), node.getExchangeType(), 
node.getPartitionKeySelector(),
@@ -82,50 +81,50 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<Operator<Transferab
   }
 
   @Override
-  public Operator<TransferableBlock> visitAggregate(AggregateNode node, 
PlanRequestContext context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
+  public MultiStageOperator visitAggregate(AggregateNode node, 
PlanRequestContext context) {
+    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     return new AggregateOperator(nextOperator, node.getDataSchema(), 
node.getAggCalls(),
         node.getGroupSet());
   }
 
   @Override
-  public Operator<TransferableBlock> visitFilter(FilterNode node, 
PlanRequestContext context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
+  public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext 
context) {
+    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     return new FilterOperator(nextOperator, node.getDataSchema(), 
node.getCondition());
   }
 
   @Override
-  public Operator<TransferableBlock> visitJoin(JoinNode node, 
PlanRequestContext context) {
+  public MultiStageOperator visitJoin(JoinNode node, PlanRequestContext 
context) {
     StageNode left = node.getInputs().get(0);
     StageNode right = node.getInputs().get(1);
 
-    Operator<TransferableBlock> leftOperator = left.visit(this, context);
-    Operator<TransferableBlock> rightOperator = right.visit(this, context);
+    MultiStageOperator leftOperator = left.visit(this, context);
+    MultiStageOperator rightOperator = right.visit(this, context);
 
     return new HashJoinOperator(leftOperator, rightOperator, 
left.getDataSchema(), node);
   }
 
   @Override
-  public Operator<TransferableBlock> visitProject(ProjectNode node, 
PlanRequestContext context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
+  public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext 
context) {
+    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     return new TransformOperator(nextOperator, node.getDataSchema(), 
node.getProjects(),
         node.getInputs().get(0).getDataSchema());
   }
 
   @Override
-  public Operator<TransferableBlock> visitSort(SortNode node, 
PlanRequestContext context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
+  public MultiStageOperator visitSort(SortNode node, PlanRequestContext 
context) {
+    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, 
context);
     return new SortOperator(nextOperator, node.getCollationKeys(), 
node.getCollationDirections(),
         node.getFetch(), node.getOffset(), node.getDataSchema());
   }
 
   @Override
-  public Operator<TransferableBlock> visitTableScan(TableScanNode node, 
PlanRequestContext context) {
+  public MultiStageOperator visitTableScan(TableScanNode node, 
PlanRequestContext context) {
     throw new UnsupportedOperationException("Stage node of type TableScanNode 
is not supported!");
   }
 
   @Override
-  public Operator<TransferableBlock> visitValue(ValueNode node, 
PlanRequestContext context) {
+  public MultiStageOperator visitValue(ValueNode node, PlanRequestContext 
context) {
     return new LiteralValueOperator(node.getDataSchema(), 
node.getLiteralRows());
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 5a94c3708f..c38a210e95 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -24,19 +24,20 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.clearInvocations;
+
 
 public class OpChainSchedulerServiceTest {
 
@@ -44,9 +45,9 @@ public class OpChainSchedulerServiceTest {
   private AutoCloseable _mocks;
 
   @Mock
-  private Operator<TransferableBlock> _operatorA;
+  private MultiStageOperator _operatorA;
   @Mock
-  private Operator<TransferableBlock> _operatorB;
+  private MultiStageOperator _operatorB;
   @Mock
   private OpChainScheduler _scheduler;
 
@@ -61,16 +62,18 @@ public class OpChainSchedulerServiceTest {
     _mocks.close();
   }
 
-  @AfterMethod
-  public void afterMethod() {
-    _executor.shutdownNow();
+  @BeforeMethod
+  public void beforeMethod() {
+    clearInvocations(_scheduler);
+    clearInvocations(_operatorA);
+    clearInvocations(_operatorB);
   }
 
   private void initExecutor(int numThreads) {
     _executor = Executors.newFixedThreadPool(numThreads);
   }
 
-  private OpChain getChain(Operator<TransferableBlock> operator) {
+  private OpChain getChain(MultiStageOperator operator) {
     return new OpChain(operator, ImmutableList.of(), 123, 1);
   }
 
@@ -132,8 +135,7 @@ public class OpChainSchedulerServiceTest {
     OpChainSchedulerService scheduler = new 
OpChainSchedulerService(_scheduler, _executor);
 
     CountDownLatch latch = new CountDownLatch(1);
-    Mockito.when(_operatorA.nextBlock())
-        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
+    
Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
         .thenAnswer(inv -> {
           latch.countDown();
           return TransferableBlockUtils.getEndOfStreamTransferableBlock();
@@ -155,9 +157,7 @@ public class OpChainSchedulerServiceTest {
     // Given:
     initExecutor(1);
     Mockito.when(_scheduler.hasNext()).thenReturn(true);
-    Mockito.when(_scheduler.next())
-        .thenReturn(getChain(_operatorA))
-        .thenReturn(getChain(_operatorB))
+    
Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA)).thenReturn(getChain(_operatorB))
         .thenReturn(getChain(_operatorA));
     OpChainSchedulerService scheduler = new 
OpChainSchedulerService(_scheduler, _executor);
 
@@ -209,6 +209,7 @@ public class OpChainSchedulerServiceTest {
     // Then:
     Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected hasNext to 
be called");
     scheduler.stopAsync().awaitTerminated();
+
     Mockito.verify(_scheduler, Mockito.never()).next();
   }
 
@@ -238,4 +239,29 @@ public class OpChainSchedulerServiceTest {
     Assert.assertTrue(secondHasNext.await(10, TimeUnit.SECONDS), "expected 
hasNext to be called again");
     scheduler.stopAsync().awaitTerminated();
   }
+
+  @Test
+  public void shouldCallCloseOnOperators()
+      throws InterruptedException {
+    // Given:
+    initExecutor(1);
+    Mockito.when(_scheduler.hasNext()).thenReturn(true).thenReturn(false);
+    Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+    OpChainSchedulerService scheduler = new 
OpChainSchedulerService(_scheduler, _executor);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> {
+      latch.countDown();
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    });
+
+    // When:
+    scheduler.startAsync().awaitRunning();
+    scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+
+    // Then:
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be 
called in less than 10 seconds");
+    scheduler.stopAsync().awaitTerminated();
+    Mockito.verify(_operatorA, Mockito.times(1)).close();
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 2bc58bdfaf..bb3faf63ec 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -20,10 +20,9 @@ package org.apache.pinot.query.runtime.executor;
 
 import com.google.common.collect.ImmutableList;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
-import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -39,7 +38,7 @@ public class RoundRobinSchedulerTest {
   private static final MailboxIdentifier MAILBOX_2 = new 
StringMailboxIdentifier("1_2:foo:2:bar:3");
 
   @Mock
-  private Operator<TransferableBlock> _operator;
+  private MultiStageOperator _operator;
 
   private AutoCloseable _mocks;
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 48b9905523..0996b7f4d0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -25,8 +25,6 @@ import java.util.List;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -49,7 +47,7 @@ public class AggregateOperatorTest {
   private AutoCloseable _mocks;
 
   @Mock
-  private Operator<TransferableBlock> _input;
+  private MultiStageOperator _input;
 
   @BeforeMethod
   public void setUp() {
@@ -212,7 +210,7 @@ public class AggregateOperatorTest {
 
   @Test
   public void testGroupByAggregateWithHashCollision() {
-    BaseOperator<TransferableBlock> upstreamOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
+    MultiStageOperator upstreamOperator = 
OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
     // Create an aggregation call with sum for first column and group by 
second column.
     RexExpression.FunctionCall agg = getSum(new RexExpression.InputRef(0));
     AggregateOperator sum0GroupBy1 =
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index 1e78233234..c53ea2037c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -24,7 +24,6 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -41,7 +40,7 @@ import org.testng.annotations.Test;
 public class FilterOperatorTest {
   private AutoCloseable _mocks;
   @Mock
-  private Operator<TransferableBlock> _upstreamOperator;
+  private MultiStageOperator _upstreamOperator;
 
   @BeforeMethod
   public void setUp() {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 0a3ae48974..b9ea6b2078 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
@@ -47,10 +46,10 @@ public class HashJoinOperatorTest {
   private AutoCloseable _mocks;
 
   @Mock
-  private Operator<TransferableBlock> _leftOperator;
+  private MultiStageOperator _leftOperator;
 
   @Mock
-  private Operator<TransferableBlock> _rightOperator;
+  private MultiStageOperator _rightOperator;
 
   @BeforeMethod
   public void setUp() {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 5a4261e4e7..3dc951dca3 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
@@ -45,7 +44,7 @@ public class MailboxSendOperatorTest {
   private AutoCloseable _mocks;
 
   @Mock
-  private Operator<TransferableBlock> _input;
+  private MultiStageOperator _input;
   @Mock
   private MailboxService<TransferableBlock> _mailboxService;
   @Mock
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 0537c67ca9..39921e324d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
 
@@ -51,7 +50,7 @@ public class OperatorTestUtil {
   private OperatorTestUtil() {
   }
 
-  public static BaseOperator<TransferableBlock> getOperator(String 
operatorName) {
+  public static MultiStageOperator getOperator(String operatorName) {
     return MOCK_OPERATOR_FACTORY.buildMockOperator(operatorName);
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index c712fb601e..2cc6b68a67 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -25,7 +25,6 @@ import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -46,7 +45,7 @@ public class SortOperatorTest {
   private AutoCloseable _mocks;
 
   @Mock
-  private Operator<TransferableBlock> _input;
+  private MultiStageOperator _input;
 
   @BeforeMethod
   public void setUp() {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
index 9c6370b5f5..b89313c25c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -46,7 +45,7 @@ public class TransformOperatorTest {
   private AutoCloseable _mocks;
 
   @Mock
-  private Operator<TransferableBlock> _upstreamOp;
+  private MultiStageOperator _upstreamOp;
 
   @BeforeMethod
   public void setUp() {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
index 0140576981..94acd49fd8 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockDataBlockOperatorFactory.java
@@ -24,9 +24,9 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -54,8 +54,8 @@ public class MockDataBlockOperatorFactory {
   }
 
   @SuppressWarnings("unchecked")
-  public BaseOperator<TransferableBlock> buildMockOperator(String 
operatorName) {
-    BaseOperator<TransferableBlock> operator = 
Mockito.mock(BaseOperator.class);
+  public MultiStageOperator buildMockOperator(String operatorName) {
+    MultiStageOperator operator = Mockito.mock(MultiStageOperator.class);
     Mockito.when(operator.nextBlock()).thenAnswer(new Answer<Object>() {
       private int _invocationCount = 0;
       public Object answer(InvocationOnMock invocation) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to