This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 27b61fe6 Extract more common logic in combine operators (#6696)
27b61fe6 is described below

commit 27b61fe6a338b1363efb64a7fed87d95cc793f8a
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Mar 22 10:34:57 2021 -0700

    Extract more common logic in combine operators (#6696)
    
    Extracts the phaser logic to the `BaseCombineOperator.getNextBlock()` to 
reduce the duplicate code.
    Extends `StreamingSelectionOnlyCombineOperator` from `BaseCombineOperator`
---
 .../core/operator/combine/BaseCombineOperator.java | 197 +++++++-------
 .../operator/combine/GroupByCombineOperator.java   | 102 +++----
 .../combine/GroupByOrderByCombineOperator.java     |  87 ++----
 ...xValueBasedSelectionOrderByCombineOperator.java | 292 +++++++++++----------
 .../core/operator/combine/MinMaxValueContext.java  |  36 ---
 .../combine/SelectionOrderByCombineOperator.java   |  72 +----
 .../StreamingSelectionOnlyCombineOperator.java     | 176 +++++--------
 7 files changed, 389 insertions(+), 573 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 417f449..f57efb3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -19,10 +19,10 @@
 package org.apache.pinot.core.operator.combine;
 
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -50,42 +50,41 @@ public abstract class BaseCombineOperator extends 
BaseOperator<IntermediateResul
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseCombineOperator.class);
 
   protected final List<Operator> _operators;
+  protected final int _numOperators;
   protected final QueryContext _queryContext;
   protected final ExecutorService _executorService;
   protected final long _endTimeMs;
-  protected final int _numOperators;
-  // Use a Phaser to ensure all the Futures are done (not scheduled, finished 
or interrupted) before the main thread
-  // returns. We need to ensure this because the main thread holds the 
reference to the segments. If a segment is
-  // deleted/refreshed, the segment will be released after the main thread 
returns, which would lead to undefined
-  // behavior (even JVM crash) when processing queries against it.
-  protected final Phaser _phaser = new Phaser(1);
-  // Use a _blockingQueue to store the per-segment result
-  protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue;
-  private final AtomicLong totalWorkerThreadCpuTimeNs = new AtomicLong(0);
-  protected int _numThreads;
-  protected Future[] _futures;
-
-  public BaseCombineOperator(List<Operator> operators, QueryContext 
queryContext, ExecutorService executorService,
-      long endTimeMs) {
+  protected final int _numThreads;
+  protected final Future[] _futures;
+  // Use a _blockingQueue to store the intermediate results blocks
+  protected final BlockingQueue<IntermediateResultsBlock> _blockingQueue = new 
LinkedBlockingQueue<>();
+  protected final AtomicLong totalWorkerThreadCpuTimeNs = new AtomicLong(0);
+
+  protected BaseCombineOperator(List<Operator> operators, QueryContext 
queryContext, ExecutorService executorService,
+      long endTimeMs, int numThreads) {
     _operators = operators;
+    _numOperators = _operators.size();
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
-    _numOperators = _operators.size();
-    _numThreads = CombineOperatorUtils.getNumThreadsForQuery(_numOperators);
-    _blockingQueue = new ArrayBlockingQueue<>(_numOperators);
+    _numThreads = numThreads;
     _futures = new Future[_numThreads];
   }
 
-  public BaseCombineOperator(List<Operator> operators, QueryContext 
queryContext, ExecutorService executorService,
-      long endTimeMs, int numThreads) {
-    this(operators, queryContext, executorService, endTimeMs);
-    _numThreads = numThreads;
-    _futures = new Future[_numThreads];
+  protected BaseCombineOperator(List<Operator> operators, QueryContext 
queryContext, ExecutorService executorService,
+      long endTimeMs) {
+    this(operators, queryContext, executorService, endTimeMs,
+        CombineOperatorUtils.getNumThreadsForQuery(operators.size()));
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
+    // Use a Phaser to ensure all the Futures are done (not scheduled, 
finished or interrupted) before the main thread
+    // returns. We need to ensure this because the main thread holds the 
reference to the segments. If a segment is
+    // deleted/refreshed, the segment will be released after the main thread 
returns, which would lead to undefined
+    // behavior (even JVM crash) when processing queries against it.
+    Phaser phaser = new Phaser(1);
+
     for (int i = 0; i < _numThreads; i++) {
       int threadIndex = i;
       _futures[i] = _executorService.submit(new TraceRunnable() {
@@ -94,13 +93,41 @@ public abstract class BaseCombineOperator extends 
BaseOperator<IntermediateResul
           ThreadTimer executionThreadTimer = new ThreadTimer();
           executionThreadTimer.start();
 
-          processSegments(threadIndex);
+          // Register the thread to the phaser
+          // NOTE: If the phaser is terminated (returning negative value) when 
trying to register the thread, that
+          //       means the query execution has finished, and the main thread 
has deregistered itself and returned
+          //       the result. Directly return as no execution result will be 
taken.
+          if (phaser.register() < 0) {
+            return;
+          }
+          try {
+            processSegments(threadIndex);
+          } finally {
+            phaser.arriveAndDeregister();
+          }
 
-          
totalWorkerThreadCpuTimeNs.addAndGet(executionThreadTimer.stopAndGetThreadTimeNs());
+          
totalWorkerThreadCpuTimeNs.getAndAdd(executionThreadTimer.stopAndGetThreadTimeNs());
         }
       });
     }
-    IntermediateResultsBlock mergedBlock = mergeResultsFromSegments();
+
+    IntermediateResultsBlock mergedBlock;
+    try {
+      mergedBlock = mergeResults();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while merging results blocks (query: 
{})", _queryContext, e);
+      mergedBlock = new 
IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
+    } finally {
+      // Cancel all ongoing jobs
+      for (Future future : _futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      // Deregister the main thread and wait for all threads done
+      phaser.awaitAdvance(phaser.arriveAndDeregister());
+    }
+
     /*
      * TODO: setThreadTime logic can be put into 
CombineOperatorUtils.setExecutionStatistics(),
      *   after we extends StreamingSelectionOnlyCombineOperator from 
BaseCombineOperator.
@@ -111,92 +138,64 @@ public abstract class BaseCombineOperator extends 
BaseOperator<IntermediateResul
   }
 
   /**
-   * processSegments will execute query on one or more segments in a single 
thread.
+   * Executes query on one or more segments in a worker thread.
    */
   protected void processSegments(int threadIndex) {
-    try {
-      // Register the thread to the phaser
-      // NOTE: If the phaser is terminated (returning negative value) when 
trying to register the thread, that
-      //       means the query execution has finished, and the main thread has 
deregistered itself and returned
-      //       the result. Directly return as no execution result will be 
taken.
-      if (_phaser.register() < 0) {
-        return;
-      }
-
-      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; 
operatorIndex += _numThreads) {
-        try {
-          IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) 
_operators.get(operatorIndex).nextBlock();
-          if (isQuerySatisfied(resultsBlock)) {
-            // Query is satisfied, skip processing the remaining segments
-            _blockingQueue.offer(resultsBlock);
-            return;
-          } else {
-            _blockingQueue.offer(resultsBlock);
-          }
-        } catch (EarlyTerminationException e) {
-          // Early-terminated by interruption (canceled by the main thread)
-          return;
-        } catch (Exception e) {
-          // Caught exception, skip processing the remaining operators
-          LOGGER
-              .error("Caught exception while executing operator of index: {} 
(query: {})", operatorIndex, _queryContext,
-                  e);
-          _blockingQueue.offer(new IntermediateResultsBlock(e));
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; 
operatorIndex += _numThreads) {
+      try {
+        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) 
_operators.get(operatorIndex).nextBlock();
+        if (isQuerySatisfied(resultsBlock)) {
+          // Query is satisfied, skip processing the remaining segments
+          _blockingQueue.offer(resultsBlock);
           return;
+        } else {
+          _blockingQueue.offer(resultsBlock);
         }
+      } catch (EarlyTerminationException e) {
+        // Early-terminated by interruption (canceled by the main thread)
+        return;
+      } catch (Exception e) {
+        // Caught exception, skip processing the remaining operators
+        LOGGER.error("Caught exception while executing operator of index: {} 
(query: {})", operatorIndex, _queryContext,
+            e);
+        _blockingQueue.offer(new IntermediateResultsBlock(e));
+        return;
       }
-    } finally {
-      _phaser.arriveAndDeregister();
     }
   }
 
   /**
-   * mergeResultsFromSegments will merge multiple intermediate result blocks 
into a result block.
+   * Merges the results from the worker threads into a results block.
    */
-  protected IntermediateResultsBlock mergeResultsFromSegments() {
+  protected IntermediateResultsBlock mergeResults()
+      throws Exception {
     IntermediateResultsBlock mergedBlock = null;
-    try {
-      int numBlocksMerged = 0;
-      while (numBlocksMerged < _numOperators) {
-        IntermediateResultsBlock blockToMerge =
-            _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-        if (blockToMerge == null) {
-          // Query times out, skip merging the remaining results blocks
-          LOGGER.error("Timed out while polling results block, 
numBlocksMerged: {} (query: {})", numBlocksMerged,
-              _queryContext);
-          mergedBlock = new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
-              new TimeoutException("Timed out while polling results block")));
-          break;
-        }
-        if (blockToMerge.getProcessingExceptions() != null) {
-          // Caught exception while processing segment, skip merging the 
remaining results blocks and directly return
-          // the exception
-          mergedBlock = blockToMerge;
-          break;
-        }
-        if (mergedBlock == null) {
-          mergedBlock = blockToMerge;
-        } else {
-          mergeResultsBlocks(mergedBlock, blockToMerge);
-        }
-        numBlocksMerged++;
-        if (isQuerySatisfied(mergedBlock)) {
-          // Query is satisfied, skip merging the remaining results blocks
-          break;
-        }
+    int numBlocksMerged = 0;
+    while (numBlocksMerged < _numOperators) {
+      IntermediateResultsBlock blockToMerge =
+          _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+      if (blockToMerge == null) {
+        // Query times out, skip merging the remaining results blocks
+        LOGGER.error("Timed out while polling results block, numBlocksMerged: 
{} (query: {})", numBlocksMerged,
+            _queryContext);
+        return new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+            new TimeoutException("Timed out while polling results block")));
       }
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while merging results blocks (query: 
{})", _queryContext, e);
-      mergedBlock = new 
IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
-    } finally {
-      // Cancel all ongoing jobs
-      for (Future future : _futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
+      if (blockToMerge.getProcessingExceptions() != null) {
+        // Caught exception while processing segment, skip merging the 
remaining results blocks and directly return the
+        // exception
+        return blockToMerge;
+      }
+      if (mergedBlock == null) {
+        mergedBlock = blockToMerge;
+      } else {
+        mergeResultsBlocks(mergedBlock, blockToMerge);
+      }
+      numBlocksMerged++;
+      if (isQuerySatisfied(mergedBlock)) {
+        // Query is satisfied, skip merging the remaining results blocks
+        return mergedBlock;
       }
-      // Deregister the main thread and wait for all threads done
-      _phaser.awaitAdvance(_phaser.arriveAndDeregister());
     }
     return mergedBlock;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index dadb23a..35395ef 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -49,7 +48,7 @@ import org.slf4j.LoggerFactory;
  * TODO: Use CombineOperatorUtils.getNumThreadsForQuery() to get the 
parallelism of the query instead of using
  *   all threads
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class GroupByCombineOperator extends BaseCombineOperator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GroupByCombineOperator.class);
   private static final String OPERATOR_NAME = "GroupByCombineOperator";
@@ -70,11 +69,6 @@ public class GroupByCombineOperator extends 
BaseCombineOperator {
   private final int _numAggregationFunctions;
   // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
   // _futures (try to interrupt the execution if it already started).
-  // Besides the CountDownLatch, we also use a Phaser to ensure all the 
Futures are done (not scheduled, finished or
-  // interrupted) before the main thread returns. We need to ensure no 
execution left before the main thread returning
-  // because the main thread holds the reference to the segments, and if the 
segments are deleted/refreshed, the
-  // segments can be released after the main thread returns, which would lead 
to undefined behavior (even JVM crash)
-  // when executing queries against them.
   private final CountDownLatch _operatorLatch;
 
   public GroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext, ExecutorService executorService,
@@ -88,27 +82,20 @@ public class GroupByCombineOperator extends 
BaseCombineOperator {
     _aggregationFunctions = _queryContext.getAggregationFunctions();
     assert _aggregationFunctions != null;
     _numAggregationFunctions = _aggregationFunctions.length;
-    int numOperators = _operators.size();
-    _operatorLatch = new CountDownLatch(numOperators);
+    _operatorLatch = new CountDownLatch(_numOperators);
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
   }
 
   /**
-   * {@inheritDoc}
-   *
-   * <p> Execute query on one or more segments in a single thread, and store 
multiple intermediate result blocks into a
-   * map
+   * Executes query on one segment in a worker thread and merges the results 
into the results map.
    */
   @Override
   protected void processSegments(int threadIndex) {
     try {
-      // Register the thread to the _phaser.
-      // If the _phaser is terminated (returning negative value) when trying 
to register the thread, that means the
-      // query execution has timed out, and the main thread has deregistered 
itself and returned the result.
-      // Directly return as no execution result will be taken.
-      if (_phaser.register() < 0) {
-        return;
-      }
-
       IntermediateResultsBlock intermediateResultsBlock =
           (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
 
@@ -153,7 +140,6 @@ public class GroupByCombineOperator extends 
BaseCombineOperator {
       
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
     } finally {
       _operatorLatch.countDown();
-      _phaser.arriveAndDeregister();
     }
   }
 
@@ -177,57 +163,39 @@ public class GroupByCombineOperator extends 
BaseCombineOperator {
    * </ul>
    */
   @Override
-  protected IntermediateResultsBlock mergeResultsFromSegments() {
-    try {
-      long timeoutMs = _endTimeMs - System.currentTimeMillis();
-      boolean opCompleted = _operatorLatch.await(timeoutMs, 
TimeUnit.MILLISECONDS);
-      if (!opCompleted) {
-        // If this happens, the broker side should already timed out, just log 
the error and return
-        String errorMessage = String
-            .format("Timed out while combining group-by results after %dms, 
queryContext = %s", timeoutMs,
-                _queryContext);
-        LOGGER.error(errorMessage);
-        return new IntermediateResultsBlock(new 
TimeoutException(errorMessage));
-      }
-
-      // Trim the results map.
-      AggregationGroupByTrimmingService aggregationGroupByTrimmingService =
-          new AggregationGroupByTrimmingService(_queryContext);
-      List<Map<String, Object>> trimmedResults =
-          
aggregationGroupByTrimmingService.trimIntermediateResultsMap(_resultsMap);
-      IntermediateResultsBlock mergedBlock = new 
IntermediateResultsBlock(_aggregationFunctions, trimmedResults, true);
+  protected IntermediateResultsBlock mergeResults()
+      throws Exception {
+    long timeoutMs = _endTimeMs - System.currentTimeMillis();
+    boolean opCompleted = _operatorLatch.await(timeoutMs, 
TimeUnit.MILLISECONDS);
+    if (!opCompleted) {
+      // If this happens, the broker side should already timed out, just log 
the error and return
+      String errorMessage = String
+          .format("Timed out while combining group-by results after %dms, 
queryContext = %s", timeoutMs, _queryContext);
+      LOGGER.error(errorMessage);
+      return new IntermediateResultsBlock(new TimeoutException(errorMessage));
+    }
 
-      // Set the processing exceptions.
-      if (!_mergedProcessingExceptions.isEmpty()) {
-        mergedBlock.setProcessingExceptions(new 
ArrayList<>(_mergedProcessingExceptions));
-      }
-      // TODO: this value should be set in the inner-segment operators. 
Setting it here might cause false positive as we
-      //       are comparing number of groups across segments with the groups 
limit for each segment.
-      if (_resultsMap.size() >= _innerSegmentNumGroupsLimit) {
-        mergedBlock.setNumGroupsLimitReached(true);
-      }
+    // Trim the results map.
+    AggregationGroupByTrimmingService aggregationGroupByTrimmingService =
+        new AggregationGroupByTrimmingService(_queryContext);
+    List<Map<String, Object>> trimmedResults =
+        
aggregationGroupByTrimmingService.trimIntermediateResultsMap(_resultsMap);
+    IntermediateResultsBlock mergedBlock = new 
IntermediateResultsBlock(_aggregationFunctions, trimmedResults, true);
 
-      return mergedBlock;
-    } catch (Exception e) {
-      return new IntermediateResultsBlock(e);
-    } finally {
-      // Cancel all ongoing jobs
-      for (Future future : _futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
-      }
-      // Deregister the main thread and wait for all threads done
-      _phaser.awaitAdvance(_phaser.arriveAndDeregister());
+    // Set the processing exceptions.
+    if (!_mergedProcessingExceptions.isEmpty()) {
+      mergedBlock.setProcessingExceptions(new 
ArrayList<>(_mergedProcessingExceptions));
+    }
+    // TODO: this value should be set in the inner-segment operators. Setting 
it here might cause false positive as we
+    //       are comparing number of groups across segments with the groups 
limit for each segment.
+    if (_resultsMap.size() >= _innerSegmentNumGroupsLimit) {
+      mergedBlock.setNumGroupsLimitReached(true);
     }
-  }
 
-  @Override
-  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, 
IntermediateResultsBlock blockToMerge) {
+    return mergedBlock;
   }
 
   @Override
-  public String getOperatorName() {
-    return OPERATOR_NAME;
+  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, 
IntermediateResultsBlock blockToMerge) {
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 38f1a54..4cd518d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
@@ -68,11 +67,6 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
   private final ConcurrentLinkedQueue<ProcessingException> 
_mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
   // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
   // _futures (try to interrupt the execution if it already started).
-  // Besides the CountDownLatch, we also use a Phaser to ensure all the 
Futures are done (not scheduled, finished or
-  // interrupted) before the main thread returns. We need to ensure no 
execution left before the main thread returning
-  // because the main thread holds the reference to the segments, and if the 
segments are deleted/refreshed, the
-  // segments can be released after the main thread returns, which would lead 
to undefined behavior (even JVM crash)
-  // when executing queries against them.
   private final CountDownLatch _operatorLatch;
   private DataSchema _dataSchema;
   private ConcurrentIndexedTable _indexedTable;
@@ -95,23 +89,17 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
     _operatorLatch = new CountDownLatch(numOperators);
   }
 
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
   /**
-   * {@inheritDoc}
-   *
-   * <p> Execute query on one or more segments in a single thread, and store 
multiple intermediate result blocks
-   * into {@link org.apache.pinot.core.data.table.IndexedTable}
+   * Executes query on one segment in a worker thread and merges the results 
into the indexed table.
    */
   @Override
   protected void processSegments(int threadIndex) {
     try {
-      // Register the thread to the _phaser.
-      // If the _phaser is terminated (returning negative value) when trying 
to register the thread, that means the
-      // query execution has timed out, and the main thread has deregistered 
itself and returned the result.
-      // Directly return as no execution result will be taken.
-      if (_phaser.register() < 0) {
-        return;
-      }
-
       IntermediateResultsBlock intermediateResultsBlock =
           (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
 
@@ -164,7 +152,6 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
       
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
     } finally {
       _operatorLatch.countDown();
-      _phaser.arriveAndDeregister();
     }
   }
 
@@ -182,52 +169,34 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
    * </ul>
    */
   @Override
-  protected IntermediateResultsBlock mergeResultsFromSegments() {
-    try {
-      long timeoutMs = _endTimeMs - System.currentTimeMillis();
-      boolean opCompleted = _operatorLatch.await(timeoutMs, 
TimeUnit.MILLISECONDS);
-      if (!opCompleted) {
-        // If this happens, the broker side should already timed out, just log 
the error and return
-        String errorMessage = String
-            .format("Timed out while combining group-by order-by results after 
%dms, queryContext = %s", timeoutMs,
-                _queryContext);
-        LOGGER.error(errorMessage);
-        return new IntermediateResultsBlock(new 
TimeoutException(errorMessage));
-      }
+  protected IntermediateResultsBlock mergeResults()
+      throws Exception {
+    long timeoutMs = _endTimeMs - System.currentTimeMillis();
+    boolean opCompleted = _operatorLatch.await(timeoutMs, 
TimeUnit.MILLISECONDS);
+    if (!opCompleted) {
+      // If this happens, the broker side should already timed out, just log 
the error and return
+      String errorMessage = String
+          .format("Timed out while combining group-by order-by results after 
%dms, queryContext = %s", timeoutMs,
+              _queryContext);
+      LOGGER.error(errorMessage);
+      return new IntermediateResultsBlock(new TimeoutException(errorMessage));
+    }
 
-      _indexedTable.finish(false);
-      IntermediateResultsBlock mergedBlock = new 
IntermediateResultsBlock(_indexedTable);
+    _indexedTable.finish(false);
+    IntermediateResultsBlock mergedBlock = new 
IntermediateResultsBlock(_indexedTable);
 
-      // Set the processing exceptions.
-      if (!_mergedProcessingExceptions.isEmpty()) {
-        mergedBlock.setProcessingExceptions(new 
ArrayList<>(_mergedProcessingExceptions));
-      }
-
-      mergedBlock.setNumResizes(_indexedTable.getNumResizes());
-      mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs());
-      // TODO - set numGroupsLimitReached
-      return mergedBlock;
-    } catch (Exception e) {
-      return new IntermediateResultsBlock(e);
-    } finally {
-      // Cancel all ongoing jobs
-      for (Future future : _futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
-      }
-      // Deregister the main thread and wait for all threads done
-      _phaser.awaitAdvance(_phaser.arriveAndDeregister());
+    // Set the processing exceptions.
+    if (!_mergedProcessingExceptions.isEmpty()) {
+      mergedBlock.setProcessingExceptions(new 
ArrayList<>(_mergedProcessingExceptions));
     }
-  }
-
-  @Override
-  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, 
IntermediateResultsBlock blockToMerge) {
 
+    mergedBlock.setNumResizes(_indexedTable.getNumResizes());
+    mergedBlock.setResizeTimeMs(_indexedTable.getResizeTimeMs());
+    // TODO - set numGroupsLimitReached
+    return mergedBlock;
   }
 
   @Override
-  public String getOperatorName() {
-    return OPERATOR_NAME;
+  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, 
IntermediateResultsBlock blockToMerge) {
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 29359f6..e7df81c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -18,20 +18,22 @@
  */
 package org.apache.pinot.core.operator.combine;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.DataSourceMetadata;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
 import org.apache.pinot.core.query.exception.EarlyTerminationException;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
@@ -67,11 +69,48 @@ public class 
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
   private final int _numRowsToKeep;
   private final List<MinMaxValueContext> _minMaxValueContexts;
 
-  public MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> 
operators, QueryContext queryContext,
-      ExecutorService executorService, long endTimeMs, 
List<MinMaxValueContext> minMaxValueContexts) {
+  MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> operators, 
QueryContext queryContext,
+      ExecutorService executorService, long endTimeMs) {
     super(operators, queryContext, executorService, endTimeMs);
-    _minMaxValueContexts = minMaxValueContexts;
     _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
+
+    List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
+    assert orderByExpressions != null;
+    int numOrderByExpressions = orderByExpressions.size();
+    assert numOrderByExpressions > 0;
+    OrderByExpressionContext firstOrderByExpression = 
orderByExpressions.get(0);
+    assert firstOrderByExpression.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER;
+    String firstOrderByColumn = 
firstOrderByExpression.getExpression().getIdentifier();
+
+    _minMaxValueContexts = new ArrayList<>(_numOperators);
+    for (Operator operator : _operators) {
+      _minMaxValueContexts.add(new 
MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn));
+    }
+    if (firstOrderByExpression.isAsc()) {
+      // For ascending order, sort on column min value in ascending order
+      _minMaxValueContexts.sort((o1, o2) -> {
+        // Put segments without column min value in the front because we 
always need to process them
+        if (o1._minValue == null) {
+          return o2._minValue == null ? 0 : -1;
+        }
+        if (o2._minValue == null) {
+          return 1;
+        }
+        return o1._minValue.compareTo(o2._minValue);
+      });
+    } else {
+      // For descending order, sort on column max value in descending order
+      _minMaxValueContexts.sort((o1, o2) -> {
+        // Put segments without column max value in the front because we 
always need to process them
+        if (o1._maxValue == null) {
+          return o2._maxValue == null ? 0 : -1;
+        }
+        if (o2._maxValue == null) {
+          return 1;
+        }
+        return o2._maxValue.compareTo(o1._maxValue);
+      });
+    }
   }
 
   @Override
@@ -95,106 +134,93 @@ public class 
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
     assert firstOrderByExpression.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER;
     boolean asc = firstOrderByExpression.isAsc();
 
-    try {
-      // Register the thread to the _phaser
-      // NOTE: If the _phaser is terminated (returning negative value) when 
trying to register the thread, that
-      //       means the query execution has finished, and the main thread has 
deregistered itself and returned
-      //       the result. Directly return as no execution result will be 
taken.
-      if (_phaser.register() < 0) {
-        return;
-      }
+    // Keep a boundary value for the thread
+    // NOTE: The thread boundary value can be different from the global 
boundary value because thread boundary
+    //       value is updated after processing the segment, while global 
boundary value is updated after the
+    //       segment result is merged.
+    Comparable threadBoundaryValue = null;
 
-      // Keep a boundary value for the thread
-      // NOTE: The thread boundary value can be different from the global 
boundary value because thread boundary
-      //       value is updated after processing the segment, while global 
boundary value is updated after the
-      //       segment result is merged.
-      Comparable threadBoundaryValue = null;
-
-      for (int operatorIndex = threadIndex; operatorIndex < _numOperators; 
operatorIndex += _numThreads) {
-        // Calculate the boundary value from global boundary and thread 
boundary
-        Comparable boundaryValue = _globalBoundaryValue.get();
-        if (boundaryValue == null) {
-          boundaryValue = threadBoundaryValue;
-        } else {
-          if (threadBoundaryValue != null) {
-            if (asc) {
-              if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
-                boundaryValue = threadBoundaryValue;
-              }
-            } else {
-              if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
-                boundaryValue = threadBoundaryValue;
-              }
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; 
operatorIndex += _numThreads) {
+      // Calculate the boundary value from global boundary and thread boundary
+      Comparable boundaryValue = _globalBoundaryValue.get();
+      if (boundaryValue == null) {
+        boundaryValue = threadBoundaryValue;
+      } else {
+        if (threadBoundaryValue != null) {
+          if (asc) {
+            if (threadBoundaryValue.compareTo(boundaryValue) < 0) {
+              boundaryValue = threadBoundaryValue;
+            }
+          } else {
+            if (threadBoundaryValue.compareTo(boundaryValue) > 0) {
+              boundaryValue = threadBoundaryValue;
             }
           }
         }
+      }
 
-        // Check if the segment can be skipped
-        MinMaxValueContext minMaxValueContext = 
_minMaxValueContexts.get(operatorIndex);
-        if (boundaryValue != null) {
-          if (asc) {
-            // For ascending order, no need to process more segments if the 
column min value is larger than the
-            // boundary value, or is equal to the boundary value and the there 
is only one order-by expression
-            if (minMaxValueContext._minValue != null) {
-              int result = 
minMaxValueContext._minValue.compareTo(boundaryValue);
-              if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
-                _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex 
- 1) / _numThreads);
-                _blockingQueue.offer(LAST_RESULTS_BLOCK);
-                return;
-              }
+      // Check if the segment can be skipped
+      MinMaxValueContext minMaxValueContext = 
_minMaxValueContexts.get(operatorIndex);
+      if (boundaryValue != null) {
+        if (asc) {
+          // For ascending order, no need to process more segments if the 
column min value is larger than the
+          // boundary value, or is equal to the boundary value and the there 
is only one order-by expression
+          if (minMaxValueContext._minValue != null) {
+            int result = minMaxValueContext._minValue.compareTo(boundaryValue);
+            if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
+              _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 
1) / _numThreads);
+              _blockingQueue.offer(LAST_RESULTS_BLOCK);
+              return;
             }
-          } else {
-            // For descending order, no need to process more segments if the 
column max value is smaller than the
-            // boundary value, or is equal to the boundary value and the there 
is only one order-by expression
-            if (minMaxValueContext._maxValue != null) {
-              int result = 
minMaxValueContext._maxValue.compareTo(boundaryValue);
-              if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
-                _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex 
- 1) / _numThreads);
-                _blockingQueue.offer(LAST_RESULTS_BLOCK);
-                return;
-              }
+          }
+        } else {
+          // For descending order, no need to process more segments if the 
column max value is smaller than the
+          // boundary value, or is equal to the boundary value and the there 
is only one order-by expression
+          if (minMaxValueContext._maxValue != null) {
+            int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
+            if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
+              _numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 
1) / _numThreads);
+              _blockingQueue.offer(LAST_RESULTS_BLOCK);
+              return;
             }
           }
         }
+      }
 
-        // Process the segment
-        try {
-          IntermediateResultsBlock resultsBlock = 
minMaxValueContext._operator.nextBlock();
-          PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getSelectionResult();
-          if (selectionResult != null && selectionResult.size() == 
_numRowsToKeep) {
-            // Segment result has enough rows, update the boundary value
-            assert selectionResult.peek() != null;
-            Comparable segmentBoundaryValue = (Comparable) 
selectionResult.peek()[0];
-            if (boundaryValue == null) {
-              boundaryValue = segmentBoundaryValue;
+      // Process the segment
+      try {
+        IntermediateResultsBlock resultsBlock = 
minMaxValueContext._operator.nextBlock();
+        PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
resultsBlock.getSelectionResult();
+        if (selectionResult != null && selectionResult.size() == 
_numRowsToKeep) {
+          // Segment result has enough rows, update the boundary value
+          assert selectionResult.peek() != null;
+          Comparable segmentBoundaryValue = (Comparable) 
selectionResult.peek()[0];
+          if (boundaryValue == null) {
+            boundaryValue = segmentBoundaryValue;
+          } else {
+            if (asc) {
+              if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
+                boundaryValue = segmentBoundaryValue;
+              }
             } else {
-              if (asc) {
-                if (segmentBoundaryValue.compareTo(boundaryValue) < 0) {
-                  boundaryValue = segmentBoundaryValue;
-                }
-              } else {
-                if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
-                  boundaryValue = segmentBoundaryValue;
-                }
+              if (segmentBoundaryValue.compareTo(boundaryValue) > 0) {
+                boundaryValue = segmentBoundaryValue;
               }
             }
           }
-          threadBoundaryValue = boundaryValue;
-          _blockingQueue.offer(resultsBlock);
-        } catch (EarlyTerminationException e) {
-          // Early-terminated by interruption (canceled by the main thread)
-          return;
-        } catch (Exception e) {
-          // Caught exception, skip processing the remaining operators
-          LOGGER
-              .error("Caught exception while executing operator of index: {} 
(query: {})", operatorIndex, _queryContext,
-                  e);
-          _blockingQueue.offer(new IntermediateResultsBlock(e));
-          return;
         }
+        threadBoundaryValue = boundaryValue;
+        _blockingQueue.offer(resultsBlock);
+      } catch (EarlyTerminationException e) {
+        // Early-terminated by interruption (canceled by the main thread)
+        return;
+      } catch (Exception e) {
+        // Caught exception, skip processing the remaining operators
+        LOGGER.error("Caught exception while executing operator of index: {} 
(query: {})", operatorIndex, _queryContext,
+            e);
+        _blockingQueue.offer(new IntermediateResultsBlock(e));
+        return;
       }
-    } finally {
-      _phaser.arriveAndDeregister();
     }
   }
 
@@ -212,55 +238,42 @@ public class 
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
    * </ul>
    */
   @Override
-  protected IntermediateResultsBlock mergeResultsFromSegments() {
+  protected IntermediateResultsBlock mergeResults()
+      throws Exception {
     IntermediateResultsBlock mergedBlock = null;
-    try {
-      int numBlocksMerged = 0;
-      while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) {
-        IntermediateResultsBlock blockToMerge =
-            _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-        if (blockToMerge == null) {
-          // Query times out, skip merging the remaining results blocks
-          LOGGER.error("Timed out while polling results block, 
numBlocksMerged: {} (query: {})", numBlocksMerged,
-              _queryContext);
-          mergedBlock = new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
-              new TimeoutException("Timed out while polling results block")));
-          break;
-        }
-        if (blockToMerge.getProcessingExceptions() != null) {
-          // Caught exception while processing segment, skip merging the 
remaining results blocks and directly return
-          // the exception
-          mergedBlock = blockToMerge;
-          break;
-        }
-        if (mergedBlock == null) {
-          mergedBlock = blockToMerge;
-        } else {
-          if (blockToMerge != LAST_RESULTS_BLOCK) {
-            mergeResultsBlocks(mergedBlock, blockToMerge);
-          }
-        }
-        numBlocksMerged++;
-
-        // Update the boundary value if enough rows are collected
-        PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
mergedBlock.getSelectionResult();
-        if (selectionResult != null && selectionResult.size() == 
_numRowsToKeep) {
-          assert selectionResult.peek() != null;
-          _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
-        }
+    int numBlocksMerged = 0;
+    while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) {
+      IntermediateResultsBlock blockToMerge =
+          _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+      if (blockToMerge == null) {
+        // Query times out, skip merging the remaining results blocks
+        LOGGER.error("Timed out while polling results block, numBlocksMerged: 
{} (query: {})", numBlocksMerged,
+            _queryContext);
+        mergedBlock = new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+            new TimeoutException("Timed out while polling results block")));
+        break;
       }
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while merging results blocks (query: 
{})", _queryContext, e);
-      mergedBlock = new 
IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
-    } finally {
-      // Cancel all ongoing jobs
-      for (Future future : _futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
+      if (blockToMerge.getProcessingExceptions() != null) {
+        // Caught exception while processing segment, skip merging the 
remaining results blocks and directly return
+        // the exception
+        mergedBlock = blockToMerge;
+        break;
+      }
+      if (mergedBlock == null) {
+        mergedBlock = blockToMerge;
+      } else {
+        if (blockToMerge != LAST_RESULTS_BLOCK) {
+          mergeResultsBlocks(mergedBlock, blockToMerge);
         }
       }
-      // Deregister the main thread and wait for all threads done
-      _phaser.awaitAdvance(_phaser.arriveAndDeregister());
+      numBlocksMerged++;
+
+      // Update the boundary value if enough rows are collected
+      PriorityQueue<Object[]> selectionResult = (PriorityQueue<Object[]>) 
mergedBlock.getSelectionResult();
+      if (selectionResult != null && selectionResult.size() == _numRowsToKeep) 
{
+        assert selectionResult.peek() != null;
+        _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
+      }
     }
     return mergedBlock;
   }
@@ -286,4 +299,17 @@ public class 
MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine
     assert mergedRows != null && rowsToMerge != null;
     SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, 
_numRowsToKeep);
   }
+
+  private static class MinMaxValueContext {
+    final SelectionOrderByOperator _operator;
+    final Comparable _minValue;
+    final Comparable _maxValue;
+
+    MinMaxValueContext(SelectionOrderByOperator operator, String column) {
+      _operator = operator;
+      DataSourceMetadata dataSourceMetadata = 
operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
+      _minValue = dataSourceMetadata.getMinValue();
+      _maxValue = dataSourceMetadata.getMaxValue();
+    }
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
deleted file mode 100644
index 3f76058..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueContext.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.combine;
-
-import org.apache.pinot.core.common.DataSourceMetadata;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-
-
-public class MinMaxValueContext {
-  final SelectionOrderByOperator _operator;
-  final Comparable _minValue;
-  final Comparable _maxValue;
-
-  MinMaxValueContext(SelectionOrderByOperator operator, String column) {
-    _operator = operator;
-    DataSourceMetadata dataSourceMetadata = 
operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
-    _minValue = dataSourceMetadata.getMinValue();
-    _maxValue = dataSourceMetadata.getMaxValue();
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
index dfd34e9..ef8a7d0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SelectionOrderByCombineOperator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.operator.combine;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -27,7 +26,6 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -43,24 +41,16 @@ import org.slf4j.LoggerFactory;
  * skip processing some segments based on the column min/max value. Otherwise 
fall back to the default combine
  * (process all segments).
  */
-@SuppressWarnings({"rawtypes", "unchecked"})
+@SuppressWarnings("rawtypes")
 public class SelectionOrderByCombineOperator extends BaseCombineOperator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SelectionOrderByCombineOperator.class);
   private static final String OPERATOR_NAME = 
"SelectionOrderByCombineOperator";
 
-  private final List<Operator> _operators;
-  private final QueryContext _queryContext;
-  private final ExecutorService _executorService;
-  private final long _endTimeMs;
   private final int _numRowsToKeep;
 
   public SelectionOrderByCombineOperator(List<Operator> operators, 
QueryContext queryContext,
       ExecutorService executorService, long endTimeMs) {
     super(operators, queryContext, executorService, endTimeMs);
-    _operators = operators;
-    _queryContext = queryContext;
-    _executorService = executorService;
-    _endTimeMs = endTimeMs;
     _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
   }
 
@@ -83,62 +73,14 @@ public class SelectionOrderByCombineOperator extends 
BaseCombineOperator {
     List<OrderByExpressionContext> orderByExpressions = 
_queryContext.getOrderByExpressions();
     assert orderByExpressions != null;
     if (orderByExpressions.get(0).getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER) {
-      return tryMinMaxValueBasedCombine(orderByExpressions);
-    } else {
-      // Fall back to the default combine (process all segments) when segments 
have different data types for the first
-      // order-by column
-      return super.getNextBlock();
-    }
-  }
-
-  private IntermediateResultsBlock 
tryMinMaxValueBasedCombine(List<OrderByExpressionContext> orderByExpressions) {
-    int numOrderByExpressions = orderByExpressions.size();
-    assert numOrderByExpressions > 0;
-    OrderByExpressionContext firstOrderByExpression = 
orderByExpressions.get(0);
-    assert firstOrderByExpression.getExpression().getType() == 
ExpressionContext.Type.IDENTIFIER;
-    String firstOrderByColumn = 
firstOrderByExpression.getExpression().getIdentifier();
-    boolean asc = firstOrderByExpression.isAsc();
-
-    int numOperators = _operators.size();
-    List<MinMaxValueContext> minMaxValueContexts = new 
ArrayList<>(numOperators);
-    for (Operator operator : _operators) {
-      minMaxValueContexts.add(new 
MinMaxValueContext((SelectionOrderByOperator) operator, firstOrderByColumn));
-    }
-    try {
-      if (asc) {
-        // For ascending order, sort on column min value in ascending order
-        minMaxValueContexts.sort((o1, o2) -> {
-          // Put segments without column min value in the front because we 
always need to process them
-          if (o1._minValue == null) {
-            return o2._minValue == null ? 0 : -1;
-          }
-          if (o2._minValue == null) {
-            return 1;
-          }
-          return o1._minValue.compareTo(o2._minValue);
-        });
-      } else {
-        // For descending order, sort on column max value in descending order
-        minMaxValueContexts.sort((o1, o2) -> {
-          // Put segments without column max value in the front because we 
always need to process them
-          if (o1._maxValue == null) {
-            return o2._maxValue == null ? 0 : -1;
-          }
-          if (o2._maxValue == null) {
-            return 1;
-          }
-          return o2._maxValue.compareTo(o1._maxValue);
-        });
+      try {
+        return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, 
_queryContext, _executorService,
+            _endTimeMs).getNextBlock();
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while using min/max value based combine, 
using the default combine", e);
       }
-    } catch (Exception e) {
-      // Fall back to the default combine (process all segments) if there are 
any exceptions.
-      LOGGER.warn("Segments have different data types for the first order-by 
column: {}, using the default combine",
-          firstOrderByColumn);
-      return super.getNextBlock();
     }
-
-    return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, 
_queryContext, _executorService, _endTimeMs,
-        minMaxValueContexts).getNextBlock();
+    return super.getNextBlock();
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
index e84e084..869044d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -22,35 +22,29 @@ import io.grpc.stub.StreamObserver;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
+import org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.query.exception.EarlyTerminationException;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
  * Combine operator for selection only streaming queries.
- * TODO: extend StreamingSelectionOnlyCombineOperator from BaseCombineOperator.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class StreamingSelectionOnlyCombineOperator extends 
BaseOperator<IntermediateResultsBlock> {
+public class StreamingSelectionOnlyCombineOperator extends BaseCombineOperator 
{
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamingSelectionOnlyCombineOperator.class);
   private static final String OPERATOR_NAME = 
"StreamingSelectionOnlyCombineOperator";
 
@@ -59,19 +53,13 @@ public class StreamingSelectionOnlyCombineOperator extends 
BaseOperator<Intermed
       new IntermediateResultsBlock(new DataSchema(new String[0], new 
DataSchema.ColumnDataType[0]),
           Collections.emptyList());
 
-  private final List<Operator> _operators;
-  private final QueryContext _queryContext;
-  private final ExecutorService _executorService;
-  private final long _endTimeMs;
   private final StreamObserver<Server.ServerResponse> _streamObserver;
   private final int _limit;
+  private final AtomicLong _numRowsCollected = new AtomicLong();
 
   public StreamingSelectionOnlyCombineOperator(List<Operator> operators, 
QueryContext queryContext,
       ExecutorService executorService, long endTimeMs, 
StreamObserver<Server.ServerResponse> streamObserver) {
-    _operators = operators;
-    _queryContext = queryContext;
-    _executorService = executorService;
-    _endTimeMs = endTimeMs;
+    super(operators, queryContext, executorService, endTimeMs);
     _streamObserver = streamObserver;
     _limit = queryContext.getLimit();
   }
@@ -82,109 +70,69 @@ public class StreamingSelectionOnlyCombineOperator extends 
BaseOperator<Intermed
   }
 
   @Override
-  protected IntermediateResultsBlock getNextBlock() {
-    int numOperators = _operators.size();
-    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
-
-    // Use a BlockingQueue to store all the results blocks
-    BlockingQueue<IntermediateResultsBlock> blockingQueue = new 
LinkedBlockingQueue<>();
-    // Use a Phaser to ensure all the Futures are done (not scheduled, 
finished or interrupted) before the main thread
-    // returns. We need to ensure this because the main thread holds the 
reference to the segments. If a segment is
-    // deleted/refreshed, the segment will be released after the main thread 
returns, which would lead to undefined
-    // behavior (even JVM crash) when processing queries against it.
-    Phaser phaser = new Phaser(1);
-
-    Future[] futures = new Future[numThreads];
-    for (int i = 0; i < numThreads; i++) {
-      int threadIndex = i;
-      futures[i] = _executorService.submit(new TraceRunnable() {
-        @Override
-        public void runJob() {
-          try {
-            // Register the thread to the phaser
-            // NOTE: If the phaser is terminated (returning negative value) 
when trying to register the thread, that
-            //       means the query execution has finished, and the main 
thread has deregistered itself and returned
-            //       the result. Directly return as no execution result will 
be taken.
-            if (phaser.register() < 0) {
-              return;
-            }
-
-            int numRowsCollected = 0;
-            for (int operatorIndex = threadIndex; operatorIndex < 
numOperators; operatorIndex += numThreads) {
-              Operator<IntermediateResultsBlock> operator = 
_operators.get(operatorIndex);
-              try {
-                IntermediateResultsBlock resultsBlock;
-                while ((resultsBlock = operator.nextBlock()) != null) {
-                  Collection<Object[]> rows = 
resultsBlock.getSelectionResult();
-                  assert rows != null;
-                  numRowsCollected += rows.size();
-                  blockingQueue.offer(resultsBlock);
-                  if (numRowsCollected >= _limit) {
-                    return;
-                  }
-                }
-                blockingQueue.offer(LAST_RESULTS_BLOCK);
-              } catch (EarlyTerminationException e) {
-                // Early-terminated by interruption (canceled by the main 
thread)
-                return;
-              } catch (Exception e) {
-                // Caught exception, skip processing the remaining operators
-                LOGGER.error("Caught exception while executing operator of 
index: {} (query: {})", operatorIndex,
-                    _queryContext, e);
-                blockingQueue.offer(new IntermediateResultsBlock(e));
-                return;
-              }
-            }
-          } finally {
-            phaser.arriveAndDeregister();
+  protected void processSegments(int threadIndex) {
+    for (int operatorIndex = threadIndex; operatorIndex < _numOperators; 
operatorIndex += _numThreads) {
+      Operator<IntermediateResultsBlock> operator = 
_operators.get(operatorIndex);
+      try {
+        IntermediateResultsBlock resultsBlock;
+        while ((resultsBlock = operator.nextBlock()) != null) {
+          Collection<Object[]> rows = resultsBlock.getSelectionResult();
+          assert rows != null;
+          long numRowsCollected = _numRowsCollected.addAndGet(rows.size());
+          _blockingQueue.offer(resultsBlock);
+          if (numRowsCollected >= _limit) {
+            return;
           }
         }
-      });
+        _blockingQueue.offer(LAST_RESULTS_BLOCK);
+      } catch (EarlyTerminationException e) {
+        // Early-terminated by interruption (canceled by the main thread)
+        return;
+      } catch (Exception e) {
+        // Caught exception, skip processing the remaining operators
+        LOGGER.error("Caught exception while executing operator of index: {} 
(query: {})", operatorIndex, _queryContext,
+            e);
+        _blockingQueue.offer(new IntermediateResultsBlock(e));
+        return;
+      }
     }
+  }
 
-    try {
-      int numRowsCollected = 0;
-      int numOperatorsFinished = 0;
-      while (numRowsCollected < _limit && numOperatorsFinished < numOperators) 
{
-        IntermediateResultsBlock resultsBlock =
-            blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-        if (resultsBlock == null) {
-          // Query times out, skip streaming the remaining results blocks
-          LOGGER.error("Timed out while polling results block (query: {})", 
_queryContext);
-          return new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
-              new TimeoutException("Timed out while polling results block")));
-        }
-        if (resultsBlock.getProcessingExceptions() != null) {
-          // Caught exception while processing segment, skip streaming the 
remaining results blocks and directly return
-          // the exception
-          return resultsBlock;
-        }
-        if (resultsBlock == LAST_RESULTS_BLOCK) {
-          numOperatorsFinished++;
-          continue;
-        }
-        DataSchema dataSchema = resultsBlock.getDataSchema();
-        Collection<Object[]> rows = resultsBlock.getSelectionResult();
-        assert dataSchema != null && rows != null;
-        numRowsCollected += rows.size();
-        DataTable dataTable = 
SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema);
-        
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable));
+  @Override
+  protected IntermediateResultsBlock mergeResults()
+      throws Exception {
+    long numRowsCollected = 0;
+    int numOperatorsFinished = 0;
+    while (numRowsCollected < _limit && numOperatorsFinished < _numOperators) {
+      IntermediateResultsBlock resultsBlock =
+          _blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+      if (resultsBlock == null) {
+        // Query times out, skip streaming the remaining results blocks
+        LOGGER.error("Timed out while polling results block (query: {})", 
_queryContext);
+        return new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+            new TimeoutException("Timed out while polling results block")));
       }
-      IntermediateResultsBlock metadataBlock = new IntermediateResultsBlock();
-      CombineOperatorUtils.setExecutionStatistics(metadataBlock, _operators);
-      return metadataBlock;
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while streaming results blocks (query: 
{})", _queryContext, e);
-      return new IntermediateResultsBlock(QueryException.INTERNAL_ERROR, e);
-    } finally {
-      // Cancel all ongoing jobs
-      for (Future future : futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
+      if (resultsBlock.getProcessingExceptions() != null) {
+        // Caught exception while processing segment, skip streaming the 
remaining results blocks and directly return
+        // the exception
+        return resultsBlock;
+      }
+      if (resultsBlock == LAST_RESULTS_BLOCK) {
+        numOperatorsFinished++;
+        continue;
       }
-      // Deregister the main thread and wait for all threads done
-      phaser.awaitAdvance(phaser.arriveAndDeregister());
+      DataSchema dataSchema = resultsBlock.getDataSchema();
+      Collection<Object[]> rows = resultsBlock.getSelectionResult();
+      assert dataSchema != null && rows != null;
+      numRowsCollected += rows.size();
+      DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, 
dataSchema);
+      
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable));
     }
+    // Return an empty results block for the metadata
+    return new IntermediateResultsBlock();
+  }
+
+  @Override
+  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, 
IntermediateResultsBlock blockToMerge) {
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to