This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2ba230c39a Handle errors in combine operator (#9689) 2ba230c39a is described below commit 2ba230c39aedb2d88958f2001b859692d31b10a5 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Oct 31 11:05:20 2022 -0700 Handle errors in combine operator (#9689) --- .../pinot/common/exception/QueryException.java | 8 +- .../blocks/results/ExceptionResultsBlock.java | 8 +- .../core/operator/combine/BaseCombineOperator.java | 21 ++- .../operator/combine/GroupByCombineOperator.java | 4 +- .../combine/CombineErrorOperatorsTest.java | 197 +++++++++++++++++++++ 5 files changed, 221 insertions(+), 17 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index 0009cee884..a4176f65fe 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -163,8 +163,8 @@ public class QueryException { QUOTA_EXCEEDED_ERROR.setMessage("QuotaExceededError"); } - public static ProcessingException getException(ProcessingException processingException, Exception exception) { - return getException(processingException, getTruncatedStackTrace(exception)); + public static ProcessingException getException(ProcessingException processingException, Throwable t) { + return getException(processingException, getTruncatedStackTrace(t)); } public static ProcessingException getException(ProcessingException processingException, String errorMessage) { @@ -174,9 +174,9 @@ public class QueryException { return copiedProcessingException; } - public static String getTruncatedStackTrace(Throwable exception) { + public static String getTruncatedStackTrace(Throwable t) { StringWriter stringWriter = new StringWriter(); - exception.printStackTrace(new PrintWriter(stringWriter)); + t.printStackTrace(new PrintWriter(stringWriter)); String fullStackTrace = stringWriter.toString(); String[] lines = StringUtils.split(fullStackTrace, '\n'); // exception should at least have one line, no need to check here. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java index d8df97ee7c..4f7f6916d1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java @@ -30,12 +30,12 @@ import org.apache.pinot.core.query.request.context.QueryContext; public class ExceptionResultsBlock extends BaseResultsBlock { - public ExceptionResultsBlock(ProcessingException processingException, Exception e) { - addToProcessingExceptions(QueryException.getException(processingException, e)); + public ExceptionResultsBlock(ProcessingException processingException, Throwable t) { + addToProcessingExceptions(QueryException.getException(processingException, t)); } - public ExceptionResultsBlock(Exception e) { - this(QueryException.QUERY_EXECUTION_ERROR, e); + public ExceptionResultsBlock(Throwable t) { + this(QueryException.QUERY_EXECUTION_ERROR, t); } @Nullable diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index b99b12d334..364de03fda 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -105,10 +105,17 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba processSegments(); } catch (EarlyTerminationException e) { // Early-terminated by interruption (canceled by the main thread) - } catch (Exception e) { - // Caught exception, skip processing the remaining segments - LOGGER.error("Caught exception while processing query: " + _queryContext, e); - onException(e); + } catch (Throwable t) { + // Caught exception/error, skip processing the remaining segments + // NOTE: We need to handle Error here, or the execution threads will die without adding the execution + // exception into the query response, and the main thread might wait infinitely (until timeout) or + // throw unexpected exceptions (such as NPE). + if (t instanceof Exception) { + LOGGER.error("Caught exception while processing query: " + _queryContext, t); + } else { + LOGGER.error("Caught serious error while processing query: " + _queryContext, t); + } + onException(t); } finally { onFinish(); phaser.arriveAndDeregister(); @@ -180,10 +187,10 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba } /** - * Invoked when {@link #processSegments()} throws exception. + * Invoked when {@link #processSegments()} throws exception/error. */ - protected void onException(Exception e) { - _blockingQueue.offer(new ExceptionResultsBlock(e)); + protected void onException(Throwable t) { + _blockingQueue.offer(new ExceptionResultsBlock(t)); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index 57bf20f50e..5108d4d964 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -216,8 +216,8 @@ public class GroupByCombineOperator extends BaseCombineOperator<GroupByResultsBl } @Override - protected void onException(Exception e) { - _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + protected void onException(Throwable t) { + _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, t)); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java new file mode 100644 index 0000000000..da8cb4835b --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Block; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +@SuppressWarnings("rawtypes") +public class CombineErrorOperatorsTest { + private static final int NUM_OPERATORS = 10; + private static final int NUM_THREADS = 2; + private static final long TIMEOUT_MS = 1000L; + + private ExecutorService _executorService; + + @BeforeClass + public void setUp() { + _executorService = Executors.newFixedThreadPool(NUM_THREADS); + } + + @Test + public void testCombineExceptionOperator() { + List<Operator> operators = new ArrayList<>(NUM_OPERATORS); + for (int i = 0; i < NUM_OPERATORS - 1; i++) { + operators.add(new RegularOperator()); + } + operators.add(new ExceptionOperator()); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable"); + queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS); + SelectionOnlyCombineOperator combineOperator = + new SelectionOnlyCombineOperator(operators, queryContext, _executorService); + BaseResultsBlock resultsBlock = combineOperator.nextBlock(); + assertTrue(resultsBlock instanceof ExceptionResultsBlock); + List<ProcessingException> processingExceptions = resultsBlock.getProcessingExceptions(); + assertNotNull(processingExceptions); + assertEquals(processingExceptions.size(), 1); + ProcessingException processingException = processingExceptions.get(0); + assertEquals(processingException.getErrorCode(), QueryException.QUERY_EXECUTION_ERROR_CODE); + assertTrue(processingException.getMessage().contains("java.lang.RuntimeException: Exception")); + } + + @Test + public void testCombineErrorOperator() { + List<Operator> operators = new ArrayList<>(NUM_OPERATORS); + for (int i = 0; i < NUM_OPERATORS - 1; i++) { + operators.add(new RegularOperator()); + } + operators.add(new ErrorOperator()); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable"); + queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS); + SelectionOnlyCombineOperator combineOperator = + new SelectionOnlyCombineOperator(operators, queryContext, _executorService); + BaseResultsBlock resultsBlock = combineOperator.nextBlock(); + assertTrue(resultsBlock instanceof ExceptionResultsBlock); + List<ProcessingException> processingExceptions = resultsBlock.getProcessingExceptions(); + assertNotNull(processingExceptions); + assertEquals(processingExceptions.size(), 1); + ProcessingException processingException = processingExceptions.get(0); + assertEquals(processingException.getErrorCode(), QueryException.QUERY_EXECUTION_ERROR_CODE); + assertTrue(processingException.getMessage().contains("java.lang.Error: Error")); + } + + @Test + public void testCombineExceptionAndErrorOperator() { + List<Operator> operators = new ArrayList<>(NUM_OPERATORS); + for (int i = 0; i < NUM_OPERATORS - 2; i++) { + operators.add(new RegularOperator()); + } + operators.add(new ExceptionOperator()); + operators.add(new ErrorOperator()); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable"); + queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS); + SelectionOnlyCombineOperator combineOperator = + new SelectionOnlyCombineOperator(operators, queryContext, _executorService); + BaseResultsBlock resultsBlock = combineOperator.nextBlock(); + assertTrue(resultsBlock instanceof ExceptionResultsBlock); + List<ProcessingException> processingExceptions = resultsBlock.getProcessingExceptions(); + assertNotNull(processingExceptions); + assertEquals(processingExceptions.size(), 1); + ProcessingException processingException = processingExceptions.get(0); + assertEquals(processingException.getErrorCode(), QueryException.QUERY_EXECUTION_ERROR_CODE); + String message = processingException.getMessage(); + assertTrue(message.contains("java.lang.RuntimeException: Exception") || message.contains("java.lang.Error: Error")); + } + + private static class ExceptionOperator extends BaseOperator { + private static final String EXPLAIN_NAME = "EXCEPTION"; + + @Override + protected Block getNextBlock() { + throw new RuntimeException("Exception"); + } + + @Override + public List<Operator> getChildOperators() { + return Collections.emptyList(); + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + return new ExecutionStatistics(0, 0, 0, 0); + } + } + + private static class ErrorOperator extends BaseOperator { + private static final String EXPLAIN_NAME = "ERROR"; + + @Override + protected Block getNextBlock() { + throw new Error("Error"); + } + + @Override + public List<Operator> getChildOperators() { + return Collections.emptyList(); + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + return new ExecutionStatistics(0, 0, 0, 0); + } + } + + private static class RegularOperator extends BaseOperator { + private static final String EXPLAIN_NAME = "REGULAR"; + + @Override + protected Block getNextBlock() { + return new SelectionResultsBlock( + new DataSchema(new String[]{"myColumn"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}), + new ArrayList<>()); + } + + @Override + public List<Operator> getChildOperators() { + return Collections.emptyList(); + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + return new ExecutionStatistics(0, 0, 0, 0); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org