This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ba230c39a Handle errors in combine operator (#9689)
2ba230c39a is described below

commit 2ba230c39aedb2d88958f2001b859692d31b10a5
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Oct 31 11:05:20 2022 -0700

    Handle errors in combine operator (#9689)
---
 .../pinot/common/exception/QueryException.java     |   8 +-
 .../blocks/results/ExceptionResultsBlock.java      |   8 +-
 .../core/operator/combine/BaseCombineOperator.java |  21 ++-
 .../operator/combine/GroupByCombineOperator.java   |   4 +-
 .../combine/CombineErrorOperatorsTest.java         | 197 +++++++++++++++++++++
 5 files changed, 221 insertions(+), 17 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index 0009cee884..a4176f65fe 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -163,8 +163,8 @@ public class QueryException {
     QUOTA_EXCEEDED_ERROR.setMessage("QuotaExceededError");
   }
 
-  public static ProcessingException getException(ProcessingException 
processingException, Exception exception) {
-    return getException(processingException, 
getTruncatedStackTrace(exception));
+  public static ProcessingException getException(ProcessingException 
processingException, Throwable t) {
+    return getException(processingException, getTruncatedStackTrace(t));
   }
 
   public static ProcessingException getException(ProcessingException 
processingException, String errorMessage) {
@@ -174,9 +174,9 @@ public class QueryException {
     return copiedProcessingException;
   }
 
-  public static String getTruncatedStackTrace(Throwable exception) {
+  public static String getTruncatedStackTrace(Throwable t) {
     StringWriter stringWriter = new StringWriter();
-    exception.printStackTrace(new PrintWriter(stringWriter));
+    t.printStackTrace(new PrintWriter(stringWriter));
     String fullStackTrace = stringWriter.toString();
     String[] lines = StringUtils.split(fullStackTrace, '\n');
     // exception should at least have one line, no need to check here.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index d8df97ee7c..4f7f6916d1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -30,12 +30,12 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 
 public class ExceptionResultsBlock extends BaseResultsBlock {
 
-  public ExceptionResultsBlock(ProcessingException processingException, 
Exception e) {
-    addToProcessingExceptions(QueryException.getException(processingException, 
e));
+  public ExceptionResultsBlock(ProcessingException processingException, 
Throwable t) {
+    addToProcessingExceptions(QueryException.getException(processingException, 
t));
   }
 
-  public ExceptionResultsBlock(Exception e) {
-    this(QueryException.QUERY_EXECUTION_ERROR, e);
+  public ExceptionResultsBlock(Throwable t) {
+    this(QueryException.QUERY_EXECUTION_ERROR, t);
   }
 
   @Nullable
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index b99b12d334..364de03fda 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -105,10 +105,17 @@ public abstract class BaseCombineOperator<T extends 
BaseResultsBlock> extends Ba
             processSegments();
           } catch (EarlyTerminationException e) {
             // Early-terminated by interruption (canceled by the main thread)
-          } catch (Exception e) {
-            // Caught exception, skip processing the remaining segments
-            LOGGER.error("Caught exception while processing query: " + 
_queryContext, e);
-            onException(e);
+          } catch (Throwable t) {
+            // Caught exception/error, skip processing the remaining segments
+            // NOTE: We need to handle Error here, or the execution threads 
will die without adding the execution
+            //       exception into the query response, and the main thread 
might wait infinitely (until timeout) or
+            //       throw unexpected exceptions (such as NPE).
+            if (t instanceof Exception) {
+              LOGGER.error("Caught exception while processing query: " + 
_queryContext, t);
+            } else {
+              LOGGER.error("Caught serious error while processing query: " + 
_queryContext, t);
+            }
+            onException(t);
           } finally {
             onFinish();
             phaser.arriveAndDeregister();
@@ -180,10 +187,10 @@ public abstract class BaseCombineOperator<T extends 
BaseResultsBlock> extends Ba
   }
 
   /**
-   * Invoked when {@link #processSegments()} throws exception.
+   * Invoked when {@link #processSegments()} throws exception/error.
    */
-  protected void onException(Exception e) {
-    _blockingQueue.offer(new ExceptionResultsBlock(e));
+  protected void onException(Throwable t) {
+    _blockingQueue.offer(new ExceptionResultsBlock(t));
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 57bf20f50e..5108d4d964 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -216,8 +216,8 @@ public class GroupByCombineOperator extends 
BaseCombineOperator<GroupByResultsBl
   }
 
   @Override
-  protected void onException(Exception e) {
-    
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
+  protected void onException(Throwable t) {
+    
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 t));
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
new file mode 100644
index 0000000000..da8cb4835b
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Block;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+@SuppressWarnings("rawtypes")
+public class CombineErrorOperatorsTest {
+  private static final int NUM_OPERATORS = 10;
+  private static final int NUM_THREADS = 2;
+  private static final long TIMEOUT_MS = 1000L;
+
+  private ExecutorService _executorService;
+
+  @BeforeClass
+  public void setUp() {
+    _executorService = Executors.newFixedThreadPool(NUM_THREADS);
+  }
+
+  @Test
+  public void testCombineExceptionOperator() {
+    List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
+    for (int i = 0; i < NUM_OPERATORS - 1; i++) {
+      operators.add(new RegularOperator());
+    }
+    operators.add(new ExceptionOperator());
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+    queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
+    SelectionOnlyCombineOperator combineOperator =
+        new SelectionOnlyCombineOperator(operators, queryContext, 
_executorService);
+    BaseResultsBlock resultsBlock = combineOperator.nextBlock();
+    assertTrue(resultsBlock instanceof ExceptionResultsBlock);
+    List<ProcessingException> processingExceptions = 
resultsBlock.getProcessingExceptions();
+    assertNotNull(processingExceptions);
+    assertEquals(processingExceptions.size(), 1);
+    ProcessingException processingException = processingExceptions.get(0);
+    assertEquals(processingException.getErrorCode(), 
QueryException.QUERY_EXECUTION_ERROR_CODE);
+    
assertTrue(processingException.getMessage().contains("java.lang.RuntimeException:
 Exception"));
+  }
+
+  @Test
+  public void testCombineErrorOperator() {
+    List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
+    for (int i = 0; i < NUM_OPERATORS - 1; i++) {
+      operators.add(new RegularOperator());
+    }
+    operators.add(new ErrorOperator());
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+    queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
+    SelectionOnlyCombineOperator combineOperator =
+        new SelectionOnlyCombineOperator(operators, queryContext, 
_executorService);
+    BaseResultsBlock resultsBlock = combineOperator.nextBlock();
+    assertTrue(resultsBlock instanceof ExceptionResultsBlock);
+    List<ProcessingException> processingExceptions = 
resultsBlock.getProcessingExceptions();
+    assertNotNull(processingExceptions);
+    assertEquals(processingExceptions.size(), 1);
+    ProcessingException processingException = processingExceptions.get(0);
+    assertEquals(processingException.getErrorCode(), 
QueryException.QUERY_EXECUTION_ERROR_CODE);
+    assertTrue(processingException.getMessage().contains("java.lang.Error: 
Error"));
+  }
+
+  @Test
+  public void testCombineExceptionAndErrorOperator() {
+    List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
+    for (int i = 0; i < NUM_OPERATORS - 2; i++) {
+      operators.add(new RegularOperator());
+    }
+    operators.add(new ExceptionOperator());
+    operators.add(new ErrorOperator());
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+    queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
+    SelectionOnlyCombineOperator combineOperator =
+        new SelectionOnlyCombineOperator(operators, queryContext, 
_executorService);
+    BaseResultsBlock resultsBlock = combineOperator.nextBlock();
+    assertTrue(resultsBlock instanceof ExceptionResultsBlock);
+    List<ProcessingException> processingExceptions = 
resultsBlock.getProcessingExceptions();
+    assertNotNull(processingExceptions);
+    assertEquals(processingExceptions.size(), 1);
+    ProcessingException processingException = processingExceptions.get(0);
+    assertEquals(processingException.getErrorCode(), 
QueryException.QUERY_EXECUTION_ERROR_CODE);
+    String message = processingException.getMessage();
+    assertTrue(message.contains("java.lang.RuntimeException: Exception") || 
message.contains("java.lang.Error: Error"));
+  }
+
+  private static class ExceptionOperator extends BaseOperator {
+    private static final String EXPLAIN_NAME = "EXCEPTION";
+
+    @Override
+    protected Block getNextBlock() {
+      throw new RuntimeException("Exception");
+    }
+
+    @Override
+    public List<Operator> getChildOperators() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String toExplainString() {
+      return EXPLAIN_NAME;
+    }
+
+    @Override
+    public ExecutionStatistics getExecutionStatistics() {
+      return new ExecutionStatistics(0, 0, 0, 0);
+    }
+  }
+
+  private static class ErrorOperator extends BaseOperator {
+    private static final String EXPLAIN_NAME = "ERROR";
+
+    @Override
+    protected Block getNextBlock() {
+      throw new Error("Error");
+    }
+
+    @Override
+    public List<Operator> getChildOperators() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String toExplainString() {
+      return EXPLAIN_NAME;
+    }
+
+    @Override
+    public ExecutionStatistics getExecutionStatistics() {
+      return new ExecutionStatistics(0, 0, 0, 0);
+    }
+  }
+
+  private static class RegularOperator extends BaseOperator {
+    private static final String EXPLAIN_NAME = "REGULAR";
+
+    @Override
+    protected Block getNextBlock() {
+      return new SelectionResultsBlock(
+          new DataSchema(new String[]{"myColumn"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}),
+          new ArrayList<>());
+    }
+
+    @Override
+    public List<Operator> getChildOperators() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String toExplainString() {
+      return EXPLAIN_NAME;
+    }
+
+    @Override
+    public ExecutionStatistics getExecutionStatistics() {
+      return new ExecutionStatistics(0, 0, 0, 0);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to