This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b16cdece24 [multistage] improve dispatch exception handling (#11688) b16cdece24 is described below commit b16cdece2408f926e927db855b45396d8fcc739e Author: Rong Rong <ro...@apache.org> AuthorDate: Wed Sep 27 13:47:35 2023 -0700 [multistage] improve dispatch exception handling (#11688) * fix test runner error not propagating through issue * clean up submission service and utilize CompletableFuture as an abstraction --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/query/service/SubmissionService.java | 59 ---------------------- .../pinot/query/service/server/QueryServer.java | 33 ++++++++---- .../apache/pinot/query/QueryServerEnclosure.java | 16 ++---- .../query/runtime/queries/QueryRunnerTest.java | 41 ++++----------- .../query/runtime/queries/QueryRunnerTestBase.java | 29 +++++++++-- 5 files changed, 63 insertions(+), 115 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java deleted file mode 100644 index b42c1d831d..0000000000 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.query.service; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - - -/** - * Submission service is used to submit multiple runnables and checks the result upon all {@link Future} returns - * or any failure occurs. - */ -public class SubmissionService { - private final ExecutorService _executor; - private final List<CompletableFuture<Void>> _futures = new ArrayList<>(); - - public SubmissionService(ExecutorService executor) { - _executor = executor; - } - - public void submit(Runnable runnable) { - _futures.add(CompletableFuture.runAsync(runnable, _executor)); - } - - public void awaitFinish(long deadlineMs) - throws Exception { - CompletableFuture<Void> completableFuture = CompletableFuture.allOf(_futures.toArray(new CompletableFuture[]{})); - try { - completableFuture.get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - } finally { - // Cancel all ongoing submission - for (CompletableFuture<Void> future : _futures) { - if (!future.isDone()) { - future.cancel(true); - } - } - } - } -} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java index a90418c742..281f434d30 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java @@ -25,8 +25,10 @@ import io.grpc.stub.StreamObserver; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; @@ -35,7 +37,6 @@ import org.apache.pinot.core.transport.grpc.GrpcQueryServer; import org.apache.pinot.query.runtime.QueryRunner; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; -import org.apache.pinot.query.service.SubmissionService; import org.apache.pinot.query.service.dispatch.QueryDispatcher; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; @@ -112,21 +113,31 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException()); return; } - // 2. Submit distributed stage plans - SubmissionService submissionService = new SubmissionService(_querySubmissionExecutorService); - distributedStagePlans.forEach(distributedStagePlan -> submissionService.submit(() -> { - _queryRunner.processQuery(distributedStagePlan, requestMetadata); - })); - // 3. await response successful or any failure which cancels all other tasks. + // 2. Submit distributed stage plans, await response successful or any failure which cancels all other tasks. + int numSubmission = distributedStagePlans.size(); + CompletableFuture<?>[] submissionStubs = new CompletableFuture[numSubmission]; + for (int i = 0; i < numSubmission; i++) { + DistributedStagePlan distributedStagePlan = distributedStagePlans.get(i); + submissionStubs[i] = + CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadata), + _querySubmissionExecutorService); + } try { - submissionService.awaitFinish(deadlineMs); - } catch (Throwable t) { - LOGGER.error("error occurred during stage submission for {}:\n{}", requestId, t); + CompletableFuture.allOf(submissionStubs).get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + LOGGER.error("error occurred during stage submission for {}:\n{}", requestId, e); responseObserver.onNext(Worker.QueryResponse.newBuilder() .putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR, - QueryException.getTruncatedStackTrace(t)).build()); + QueryException.getTruncatedStackTrace(e)).build()); responseObserver.onCompleted(); return; + } finally { + // Cancel all ongoing submission + for (CompletableFuture<?> future : submissionStubs) { + if (!future.isDone()) { + future.cancel(true); + } + } } responseObserver.onNext( Worker.QueryResponse.newBuilder().putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "") diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java index 8038898fa2..b4b1dff3cc 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java @@ -20,6 +20,7 @@ package org.apache.pinot.query; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -108,16 +109,9 @@ public class QueryServerEnclosure { _queryRunner.shutDown(); } - public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) { - _queryRunner.getExecutorService().submit(() -> { - try { - _queryRunner.processQuery(distributedStagePlan, requestMetadataMap); - } catch (Exception e) { - // TODO: Find a way to propagate the exception and fail the test - System.err.println("Caught exception while executing query"); - e.printStackTrace(System.err); - throw new RuntimeException("Error executing query!", e); - } - }); + public CompletableFuture<Void> processQuery(DistributedStagePlan distributedStagePlan, + Map<String, String> requestMetadataMap) { + return CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadataMap), + _queryRunner.getExecutorService()); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java index 1d8646399a..d54eb5b1d0 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java @@ -20,21 +20,16 @@ package org.apache.pinot.query.runtime.queries; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import java.util.stream.Collectors; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; -import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryEnvironmentTestBase; import org.apache.pinot.query.QueryServerEnclosure; import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.planner.DispatchablePlanFragment; -import org.apache.pinot.query.planner.DispatchableSubPlan; import org.apache.pinot.query.routing.QueryServerInstance; -import org.apache.pinot.query.service.dispatch.QueryDispatcher; import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory; import org.apache.pinot.query.testutils.QueryTestUtils; import org.apache.pinot.spi.config.table.TableType; @@ -44,8 +39,6 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.sql.parsers.CalciteSqlParser; -import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -181,34 +174,22 @@ public class QueryRunnerTest extends QueryRunnerTestBase { */ @Test(dataProvider = "testDataWithSqlExecutionExceptions") public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) { - long requestId = REQUEST_ID_GEN.getAndIncrement(); - SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql); - QueryEnvironment.QueryPlannerResult queryPlannerResult = - _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId); - DispatchableSubPlan dispatchableSubPlan = queryPlannerResult.getQueryPlan(); - Map<String, String> requestMetadataMap = new HashMap<>(); - requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, String.valueOf(requestId)); - Long timeoutMsInQueryOption = QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions()); - long timeoutMs = - timeoutMsInQueryOption != null ? timeoutMsInQueryOption : CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS; - requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs)); - requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, "true"); - requestMetadataMap.putAll(sqlNodeAndOptions.getOptions()); - List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList(); - for (int stageId = 1; stageId < stagePlans.size(); stageId++) { - processDistributedStagePlans(dispatchableSubPlan, stageId, requestMetadataMap); - } try { - QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, Collections.emptyMap(), null, - _mailboxService); - Assert.fail("Should have thrown exception!"); - } catch (RuntimeException e) { + // query pinot + List<Object[]> resultRows = queryRunner(sql, null); + Assert.fail( + "Expected error with message '" + exceptionMsg + "'. But instead rows were returned: " + resultRows.stream() + .map(Arrays::toString).collect(Collectors.joining(",\n"))); + } catch (Exception e) { // NOTE: The actual message is (usually) something like: // Received error query execution result block: {200=QueryExecutionError: // Query execution error on: Server_localhost_12345 // java.lang.IllegalArgumentException: Illegal Json Path: $['path'] does not match document String exceptionMessage = e.getMessage(); - Assert.assertTrue(exceptionMessage.startsWith("Received error query execution result block: ")); + Assert.assertTrue( + exceptionMessage.startsWith("Received error query execution result block: ") || exceptionMessage.startsWith( + "Error occurred during stage submission"), + "Exception message didn't start with proper heading: " + exceptionMessage); Assert.assertTrue(exceptionMessage.contains(exceptionMsg), "Exception should contain: " + exceptionMsg + ", but found: " + exceptionMessage); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java index a79880e5a5..0c422f3962 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java @@ -36,10 +36,13 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator; @@ -112,33 +115,51 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { requestMetadataMap.put(CommonConstants.Broker.Request.TRACE, "true"); } + // Submission Stub logic are mimic {@link QueryServer} List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList(); + List<CompletableFuture<?>> submissionStubs = new ArrayList<>(); for (int stageId = 0; stageId < stagePlans.size(); stageId++) { if (stageId != 0) { - processDistributedStagePlans(dispatchableSubPlan, stageId, requestMetadataMap); + submissionStubs.addAll(processDistributedStagePlans(dispatchableSubPlan, stageId, requestMetadataMap)); } if (executionStatsAggregatorMap != null) { executionStatsAggregatorMap.put(stageId, new ExecutionStatsAggregator(true)); } } + try { + CompletableFuture.allOf(submissionStubs.toArray(new CompletableFuture[0])).get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + // wrap and throw the exception here is for assert purpose on dispatch-time error + throw new RuntimeException("Error occurred during stage submission: " + QueryException.getTruncatedStackTrace(e)); + } finally { + // Cancel all ongoing submission + for (CompletableFuture<?> future : submissionStubs) { + if (!future.isDone()) { + future.cancel(true); + } + } + } + // exception will be propagated through for assert purpose on runtime error ResultTable resultTable = QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, Collections.emptyMap(), executionStatsAggregatorMap, _mailboxService); return resultTable.getRows(); } - protected void processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan, int stageId, - Map<String, String> requestMetadataMap) { + protected List<CompletableFuture<?>> processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan, + int stageId, Map<String, String> requestMetadataMap) { Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap(); + List<CompletableFuture<?>> submissionStubs = new ArrayList<>(); for (Map.Entry<QueryServerInstance, List<Integer>> entry : serverInstanceToWorkerIdMap.entrySet()) { QueryServerInstance server = entry.getKey(); for (int workerId : entry.getValue()) { DistributedStagePlan distributedStagePlan = constructDistributedStagePlan(dispatchableSubPlan, stageId, new VirtualServerAddress(server, workerId)); - _servers.get(server).processQuery(distributedStagePlan, requestMetadataMap); + submissionStubs.add(_servers.get(server).processQuery(distributedStagePlan, requestMetadataMap)); } } + return submissionStubs; } protected static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org