This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b16cdece24 [multistage] improve dispatch exception handling (#11688)
b16cdece24 is described below

commit b16cdece2408f926e927db855b45396d8fcc739e
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed Sep 27 13:47:35 2023 -0700

    [multistage] improve dispatch exception handling (#11688)
    
    * fix test runner error not propagating through issue
    * clean up submission service and utilize CompletableFuture as an 
abstraction
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/query/service/SubmissionService.java     | 59 ----------------------
 .../pinot/query/service/server/QueryServer.java    | 33 ++++++++----
 .../apache/pinot/query/QueryServerEnclosure.java   | 16 ++----
 .../query/runtime/queries/QueryRunnerTest.java     | 41 ++++-----------
 .../query/runtime/queries/QueryRunnerTestBase.java | 29 +++++++++--
 5 files changed, 63 insertions(+), 115 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java
deleted file mode 100644
index b42c1d831d..0000000000
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/SubmissionService.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.service;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * Submission service is used to submit multiple runnables and checks the 
result upon all {@link Future} returns
- * or any failure occurs.
- */
-public class SubmissionService {
-  private final ExecutorService _executor;
-  private final List<CompletableFuture<Void>> _futures = new ArrayList<>();
-
-  public SubmissionService(ExecutorService executor) {
-    _executor = executor;
-  }
-
-  public void submit(Runnable runnable) {
-    _futures.add(CompletableFuture.runAsync(runnable, _executor));
-  }
-
-  public void awaitFinish(long deadlineMs)
-      throws Exception {
-    CompletableFuture<Void> completableFuture = 
CompletableFuture.allOf(_futures.toArray(new CompletableFuture[]{}));
-    try {
-      completableFuture.get(deadlineMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
-    } finally {
-      // Cancel all ongoing submission
-      for (CompletableFuture<Void> future : _futures) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
-      }
-    }
-  }
-}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index a90418c742..281f434d30 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -25,8 +25,10 @@ import io.grpc.stub.StreamObserver;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
@@ -35,7 +37,6 @@ import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
-import org.apache.pinot.query.service.SubmissionService;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
@@ -112,21 +113,31 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
       responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad 
request").withCause(e).asException());
       return;
     }
-    // 2. Submit distributed stage plans
-    SubmissionService submissionService = new 
SubmissionService(_querySubmissionExecutorService);
-    distributedStagePlans.forEach(distributedStagePlan -> 
submissionService.submit(() -> {
-      _queryRunner.processQuery(distributedStagePlan, requestMetadata);
-    }));
-    // 3. await response successful or any failure which cancels all other 
tasks.
+    // 2. Submit distributed stage plans, await response successful or any 
failure which cancels all other tasks.
+    int numSubmission = distributedStagePlans.size();
+    CompletableFuture<?>[] submissionStubs = new 
CompletableFuture[numSubmission];
+    for (int i = 0; i < numSubmission; i++) {
+      DistributedStagePlan distributedStagePlan = distributedStagePlans.get(i);
+      submissionStubs[i] =
+          CompletableFuture.runAsync(() -> 
_queryRunner.processQuery(distributedStagePlan, requestMetadata),
+              _querySubmissionExecutorService);
+    }
     try {
-      submissionService.awaitFinish(deadlineMs);
-    } catch (Throwable t) {
-      LOGGER.error("error occurred during stage submission for {}:\n{}", 
requestId, t);
+      CompletableFuture.allOf(submissionStubs).get(deadlineMs - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      LOGGER.error("error occurred during stage submission for {}:\n{}", 
requestId, e);
       responseObserver.onNext(Worker.QueryResponse.newBuilder()
           
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
-              QueryException.getTruncatedStackTrace(t)).build());
+              QueryException.getTruncatedStackTrace(e)).build());
       responseObserver.onCompleted();
       return;
+    } finally {
+      // Cancel all ongoing submission
+      for (CompletableFuture<?> future : submissionStubs) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
     }
     responseObserver.onNext(
         
Worker.QueryResponse.newBuilder().putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK,
 "")
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 8038898fa2..b4b1dff3cc 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -108,16 +109,9 @@ public class QueryServerEnclosure {
     _queryRunner.shutDown();
   }
 
-  public void processQuery(DistributedStagePlan distributedStagePlan, 
Map<String, String> requestMetadataMap) {
-    _queryRunner.getExecutorService().submit(() -> {
-      try {
-        _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
-      } catch (Exception e) {
-        // TODO: Find a way to propagate the exception and fail the test
-        System.err.println("Caught exception while executing query");
-        e.printStackTrace(System.err);
-        throw new RuntimeException("Error executing query!", e);
-      }
-    });
+  public CompletableFuture<Void> processQuery(DistributedStagePlan 
distributedStagePlan,
+      Map<String, String> requestMetadataMap) {
+    return CompletableFuture.runAsync(() -> 
_queryRunner.processQuery(distributedStagePlan, requestMetadataMap),
+        _queryRunner.getExecutorService());
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index 1d8646399a..d54eb5b1d0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -20,21 +20,16 @@ package org.apache.pinot.query.runtime.queries;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import java.util.stream.Collectors;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
-import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.DispatchablePlanFragment;
-import org.apache.pinot.query.planner.DispatchableSubPlan;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
 import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.config.table.TableType;
@@ -44,8 +39,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
-import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -181,34 +174,22 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
    */
   @Test(dataProvider = "testDataWithSqlExecutionExceptions")
   public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
-    long requestId = REQUEST_ID_GEN.getAndIncrement();
-    SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sql);
-    QueryEnvironment.QueryPlannerResult queryPlannerResult =
-        _queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId);
-    DispatchableSubPlan dispatchableSubPlan = 
queryPlannerResult.getQueryPlan();
-    Map<String, String> requestMetadataMap = new HashMap<>();
-    
requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, 
String.valueOf(requestId));
-    Long timeoutMsInQueryOption = 
QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
-    long timeoutMs =
-        timeoutMsInQueryOption != null ? timeoutMsInQueryOption : 
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS;
-    
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
 String.valueOf(timeoutMs));
-    
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
 "true");
-    requestMetadataMap.putAll(sqlNodeAndOptions.getOptions());
-    List<DispatchablePlanFragment> stagePlans = 
dispatchableSubPlan.getQueryStageList();
-    for (int stageId = 1; stageId < stagePlans.size(); stageId++) {
-      processDistributedStagePlans(dispatchableSubPlan, stageId, 
requestMetadataMap);
-    }
     try {
-      QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, 
Collections.emptyMap(), null,
-          _mailboxService);
-      Assert.fail("Should have thrown exception!");
-    } catch (RuntimeException e) {
+      // query pinot
+      List<Object[]> resultRows = queryRunner(sql, null);
+      Assert.fail(
+          "Expected error with message '" + exceptionMsg + "'. But instead 
rows were returned: " + resultRows.stream()
+              .map(Arrays::toString).collect(Collectors.joining(",\n")));
+    } catch (Exception e) {
       // NOTE: The actual message is (usually) something like:
       //   Received error query execution result block: 
{200=QueryExecutionError:
       //   Query execution error on: Server_localhost_12345
       //     java.lang.IllegalArgumentException: Illegal Json Path: $['path'] 
does not match document
       String exceptionMessage = e.getMessage();
-      Assert.assertTrue(exceptionMessage.startsWith("Received error query 
execution result block: "));
+      Assert.assertTrue(
+          exceptionMessage.startsWith("Received error query execution result 
block: ") || exceptionMessage.startsWith(
+              "Error occurred during stage submission"),
+          "Exception message didn't start with proper heading: " + 
exceptionMessage);
       Assert.assertTrue(exceptionMessage.contains(exceptionMsg),
           "Exception should contain: " + exceptionMsg + ", but found: " + 
exceptionMessage);
     }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index a79880e5a5..0c422f3962 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -36,10 +36,13 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
@@ -112,33 +115,51 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
       requestMetadataMap.put(CommonConstants.Broker.Request.TRACE, "true");
     }
 
+    // Submission Stub logic are mimic {@link QueryServer}
     List<DispatchablePlanFragment> stagePlans = 
dispatchableSubPlan.getQueryStageList();
+    List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
     for (int stageId = 0; stageId < stagePlans.size(); stageId++) {
       if (stageId != 0) {
-        processDistributedStagePlans(dispatchableSubPlan, stageId, 
requestMetadataMap);
+        
submissionStubs.addAll(processDistributedStagePlans(dispatchableSubPlan, 
stageId, requestMetadataMap));
       }
       if (executionStatsAggregatorMap != null) {
         executionStatsAggregatorMap.put(stageId, new 
ExecutionStatsAggregator(true));
       }
     }
+    try {
+      CompletableFuture.allOf(submissionStubs.toArray(new 
CompletableFuture[0])).get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      // wrap and throw the exception here is for assert purpose on 
dispatch-time error
+      throw new RuntimeException("Error occurred during stage submission: " + 
QueryException.getTruncatedStackTrace(e));
+    } finally {
+      // Cancel all ongoing submission
+      for (CompletableFuture<?> future : submissionStubs) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+    }
+    // exception will be propagated through for assert purpose on runtime error
     ResultTable resultTable =
         QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, 
Collections.emptyMap(),
             executionStatsAggregatorMap, _mailboxService);
     return resultTable.getRows();
   }
 
-  protected void processDistributedStagePlans(DispatchableSubPlan 
dispatchableSubPlan, int stageId,
-      Map<String, String> requestMetadataMap) {
+  protected List<CompletableFuture<?>> 
processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan,
+      int stageId, Map<String, String> requestMetadataMap) {
     Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
         
dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap();
+    List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
     for (Map.Entry<QueryServerInstance, List<Integer>> entry : 
serverInstanceToWorkerIdMap.entrySet()) {
       QueryServerInstance server = entry.getKey();
       for (int workerId : entry.getValue()) {
         DistributedStagePlan distributedStagePlan =
             constructDistributedStagePlan(dispatchableSubPlan, stageId, new 
VirtualServerAddress(server, workerId));
-        _servers.get(server).processQuery(distributedStagePlan, 
requestMetadataMap);
+        
submissionStubs.add(_servers.get(server).processQuery(distributedStagePlan, 
requestMetadataMap));
       }
     }
+    return submissionStubs;
   }
 
   protected static DistributedStagePlan 
constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to