This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f95ada1ab35 Fix the error handling in 
SequentialSortedGroupByCombineOperator (#16840)
f95ada1ab35 is described below

commit f95ada1ab35b288f3529966335f798c8fcb4c941
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Sep 18 11:22:40 2025 -0700

    Fix the error handling in SequentialSortedGroupByCombineOperator (#16840)
---
 .../SequentialSortedGroupByCombineOperator.java    | 78 ++++++----------------
 .../combine/SortedGroupByCombineOperator.java      |  7 +-
 .../combine/CombineErrorOperatorsTest.java         | 63 ++++++++++++++---
 3 files changed, 79 insertions(+), 69 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java
index b0a88acfe3e..629f5e7aab3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java
@@ -35,8 +35,6 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
 import org.apache.pinot.core.util.GroupByUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -56,19 +54,16 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings({"rawtypes"})
 public class SequentialSortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SequentialSortedGroupByCombineOperator.class);
+  // TODO: Consider changing it to "COMBINE_GROUP_BY_SEQUENTIAL_SORTED" to 
distinguish from GroupByCombineOperator
   private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
 
-  // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
-  // _futures (try to interrupt the execution if it already started).
-  private volatile boolean _groupsTrimmed;
-  private volatile boolean _numGroupsLimitReached;
-  private volatile boolean _numGroupsWarningLimitReached;
-
-  private SortedRecords _records = null;
   private final SortedRecordsMerger _sortedRecordsMerger;
 
+  private SortedRecords _records;
+  private boolean _groupsTrimmed;
+  private boolean _numGroupsLimitReached;
+  private boolean _numGroupsWarningLimitReached;
+
   public SequentialSortedGroupByCombineOperator(List<Operator> operators, 
QueryContext queryContext,
       ExecutorService executorService) {
     super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
@@ -105,22 +100,12 @@ public class SequentialSortedGroupByCombineOperator 
extends BaseSingleBlockCombi
     int operatorId;
     while (_processingException.get() == null && (operatorId = 
_nextOperatorId.getAndIncrement()) < _numOperators) {
       Operator operator = _operators.get(operatorId);
+      GroupByResultsBlock resultsBlock;
       try {
         if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
           ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
         }
-        GroupByResultsBlock resultsBlock = (GroupByResultsBlock) 
operator.nextBlock();
-        if (resultsBlock.isGroupsTrimmed()) {
-          _groupsTrimmed = true;
-        }
-        // Set groups limit reached flag.
-        if (resultsBlock.isNumGroupsLimitReached()) {
-          _numGroupsLimitReached = true;
-        }
-        if (resultsBlock.isNumGroupsWarningLimitReached()) {
-          _numGroupsWarningLimitReached = true;
-        }
-        _blockingQueue.offer(resultsBlock);
+        resultsBlock = (GroupByResultsBlock) operator.nextBlock();
       } catch (RuntimeException e) {
         throw wrapOperatorException(operator, e);
       } finally {
@@ -128,43 +113,24 @@ public class SequentialSortedGroupByCombineOperator 
extends BaseSingleBlockCombi
           ((AcquireReleaseColumnsSegmentOperator) operator).release();
         }
       }
+      _blockingQueue.offer(resultsBlock);
     }
   }
 
-  @Override
-  public void onProcessSegmentsException(Throwable t) {
-    _processingException.compareAndSet(null, t);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>Combines sorted intermediate aggregation result blocks from underlying 
operators and returns a merged one.
-   * <ul>
-   *   <li>
-   *     Merges multiple sorted intermediate aggregation result from {@link 
this#_blockingQueue} into one
-   *     and create a result block
-   *   </li>
-   *   <li>
-   *     Set all exceptions encountered during execution into the merged 
result block
-   *   </li>
-   * </ul>
-   */
+  /// {@inheritDoc}
+  ///
+  /// Merges multiple sorted intermediate results from [#_blockingQueue] into 
one and creates a result block.
   @Override
   public BaseResultsBlock mergeResults()
       throws Exception {
-
+    DataSchema dataSchema = null;
     int numBlocksMerged = 0;
     long endTimeMs = _queryContext.getEndTimeMs();
-    DataSchema dataSchema = null;
     while (numBlocksMerged < _numOperators) {
       // Timeout has reached, shouldn't continue to process. 
`_blockingQueue.poll` will continue to return blocks even
       // if negative timeout is provided; therefore an extra check is needed
       long waitTimeMs = endTimeMs - System.currentTimeMillis();
       if (waitTimeMs <= 0) {
-        String userError = "Timed out while combining group-by order-by 
results after " + waitTimeMs + "ms";
-        String logMsg = userError + ", queryContext = " + _queryContext;
-        LOGGER.error(logMsg);
         return getTimeoutResultsBlock(numBlocksMerged);
       }
       BaseResultsBlock blockToMerge = _blockingQueue.poll(waitTimeMs, 
TimeUnit.MILLISECONDS);
@@ -181,10 +147,17 @@ public class SequentialSortedGroupByCombineOperator 
extends BaseSingleBlockCombi
         dataSchema = groupByResultBlockToMerge.getDataSchema();
       }
 
+      // Merge records
+      if (_records == null) {
+        _records = 
GroupByUtils.getAndPopulateSortedRecords(groupByResultBlockToMerge);
+      } else {
+        _records = _sortedRecordsMerger.mergeGroupByResultsBlock(_records, 
groupByResultBlockToMerge);
+      }
+
+      // Set flags
       if (groupByResultBlockToMerge.isGroupsTrimmed()) {
         _groupsTrimmed = true;
       }
-      // Set groups limit reached flag.
       if (groupByResultBlockToMerge.isNumGroupsLimitReached()) {
         _numGroupsLimitReached = true;
       }
@@ -192,17 +165,10 @@ public class SequentialSortedGroupByCombineOperator 
extends BaseSingleBlockCombi
         _numGroupsWarningLimitReached = true;
       }
 
-      if (_records == null) {
-        _records = 
GroupByUtils.getAndPopulateSortedRecords(groupByResultBlockToMerge);
-      } else {
-        _records = _sortedRecordsMerger.mergeGroupByResultsBlock(_records, 
groupByResultBlockToMerge);
-      }
       numBlocksMerged++;
     }
 
-    SortedRecordTable table =
-        new SortedRecordTable(_records, dataSchema, _queryContext, 
_executorService);
-
+    SortedRecordTable table = new SortedRecordTable(_records, dataSchema, 
_queryContext, _executorService);
     if (_queryContext.isServerReturnFinalResult()) {
       table.finish(true, true);
     } else if (_queryContext.isServerReturnFinalResultKeyUnpartitioned()) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java
index 84632ea97e3..f917eedacec 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java
@@ -66,8 +66,8 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class SortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SortedGroupByCombineOperator.class);
+  // TODO: Consider changing it to "COMBINE_GROUP_BY_SORTED" to distinguish 
from GroupByCombineOperator
   private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
 
   // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
@@ -237,10 +237,7 @@ public class SortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator
   }
 
   private GroupByResultsBlock finishSortedRecords(SortedRecords records) {
-    SortedRecordTable table =
-        new SortedRecordTable(records, _dataSchema, _queryContext, 
_executorService);
-
-    // finish
+    SortedRecordTable table = new SortedRecordTable(records, _dataSchema, 
_queryContext, _executorService);
     if (_queryContext.isServerReturnFinalResult()) {
       table.finish(true, true);
     } else if (_queryContext.isServerReturnFinalResultKeyUnpartitioned()) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
index 4523581cb30..5d8eec88c98 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
@@ -20,17 +20,18 @@ package org.apache.pinot.core.operator.combine;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -51,9 +52,12 @@ public class CombineErrorOperatorsTest {
   private static final int NUM_THREADS = 2;
   private static final QueryContext QUERY_CONTEXT =
       QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+  private static final QueryContext GROUP_BY_QUERY_CONTEXT =
+      QueryContextConverterUtils.getQueryContext("SELECT myColumn, COUNT(*) 
FROM testTable GROUP BY 1 ORDER BY 1");
 
   static {
     QUERY_CONTEXT.setEndTimeMs(Long.MAX_VALUE);
+    GROUP_BY_QUERY_CONTEXT.setEndTimeMs(Long.MAX_VALUE);
   }
 
   private ExecutorService _executorService;
@@ -66,8 +70,8 @@ public class CombineErrorOperatorsTest {
   @DataProvider(name = "getErrorCodes")
   public static Object[][] getErrorCodes() {
     return Arrays.stream(QueryErrorCode.values())
-      .map(queryErrorCode -> new Object[]{queryErrorCode})
-      .toArray(Object[][]::new);
+        .map(queryErrorCode -> new Object[]{queryErrorCode})
+        .toArray(Object[][]::new);
   }
 
   @Test(dataProvider = "getErrorCodes")
@@ -88,6 +92,24 @@ public class CombineErrorOperatorsTest {
     assertEquals(errorMsg.getErrCode(), queryErrorCode);
   }
 
+  @Test(dataProvider = "getErrorCodes")
+  public void 
testSequentialSortedGroupByCombineExceptionOperator(QueryErrorCode 
queryErrorCode) {
+    List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
+    for (int i = 0; i < NUM_OPERATORS - 1; i++) {
+      operators.add(new RegularGroupByOperator());
+    }
+    operators.add(new ExceptionOperator(queryErrorCode.asException("Test 
exception message")));
+    SequentialSortedGroupByCombineOperator combineOperator =
+        new SequentialSortedGroupByCombineOperator(operators, 
GROUP_BY_QUERY_CONTEXT, _executorService);
+    BaseResultsBlock resultsBlock = combineOperator.nextBlock();
+    assertTrue(resultsBlock instanceof ExceptionResultsBlock);
+    List<QueryErrorMessage> errorMsgs = resultsBlock.getErrorMessages();
+    assertNotNull(errorMsgs);
+    assertEquals(errorMsgs.size(), 1);
+    QueryErrorMessage errorMsg = errorMsgs.get(0);
+    assertEquals(errorMsg.getErrCode(), queryErrorCode);
+  }
+
   @Test
   public void testCombineErrorOperator() {
     List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
@@ -141,7 +163,7 @@ public class CombineErrorOperatorsTest {
 
     @Override
     public List<Operator> getChildOperators() {
-      return Collections.emptyList();
+      return List.of();
     }
 
     @Override
@@ -165,7 +187,7 @@ public class CombineErrorOperatorsTest {
 
     @Override
     public List<Operator> getChildOperators() {
-      return Collections.emptyList();
+      return List.of();
     }
 
     @Override
@@ -185,13 +207,38 @@ public class CombineErrorOperatorsTest {
     @Override
     protected Block getNextBlock() {
       return new SelectionResultsBlock(
-          new DataSchema(new String[]{"myColumn"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}),
-          new ArrayList<>(), QUERY_CONTEXT);
+          new DataSchema(new String[]{"myColumn"}, new 
ColumnDataType[]{ColumnDataType.STRING}), new ArrayList<>(),
+          QUERY_CONTEXT);
+    }
+
+    @Override
+    public List<Operator> getChildOperators() {
+      return List.of();
+    }
+
+    @Override
+    public String toExplainString() {
+      return EXPLAIN_NAME;
+    }
+
+    @Override
+    public ExecutionStatistics getExecutionStatistics() {
+      return new ExecutionStatistics(0, 0, 0, 0);
+    }
+  }
+
+  private static class RegularGroupByOperator extends BaseOperator {
+    private static final String EXPLAIN_NAME = "REGULAR_GROUP_BY";
+
+    @Override
+    protected Block getNextBlock() {
+      return new GroupByResultsBlock(new DataSchema(new String[]{"myColumn", 
"count(*)"},
+          new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.LONG}), 
new ArrayList<>(), GROUP_BY_QUERY_CONTEXT);
     }
 
     @Override
     public List<Operator> getChildOperators() {
-      return Collections.emptyList();
+      return List.of();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to