ankitsultana commented on code in PR #10401:
URL: https://github.com/apache/pinot/pull/10401#discussion_r1132660323


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -135,7 +135,8 @@ public void init(PinotConfiguration config, 
InstanceDataManager instanceDataMana
       _queryRunnerExecutorService = Executors.newFixedThreadPool(
           ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
           new NamedThreadFactory("query_runner_on_" + _port + "_port"));
-      _scheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs), _queryWorkerExecutorService);
+      _scheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs), _queryWorkerExecutorService,
+          releaseMs);

Review Comment:
   I think you may want to call the two args constructor here? Otherwise the 
cancellation ttl is same as releaseMs



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -81,21 +84,38 @@ public ResultTable submitAndReduce(long requestId, 
QueryPlan queryPlan,
       MailboxService<TransferableBlock> mailboxService, long timeoutMs, 
Map<String, String> queryOptions,
       Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
       throws Exception {
-    // submit all the distributed stages.
-    int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
-    // run reduce stage and return result.
-    MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
queryPlan.getQueryStageMap().get(reduceStageId);
-    MailboxReceiveOperator mailboxReceiveOperator = 
createReduceStageOperator(mailboxService,
-        
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
 requestId,
-        reduceNode.getSenderStageId(), reduceStageId, 
reduceNode.getDataSchema(),
-        new VirtualServerAddress(mailboxService.getHostname(), 
mailboxService.getMailboxPort(), 0), timeoutMs);
-    List<DataBlock> resultDataBlocks =
-        reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, 
executionStatsAggregator, queryPlan);
-    return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
-        queryPlan.getQueryStageMap().get(0).getDataSchema());
+    try {
+      // submit all the distributed stages.
+      int reduceStageId = submit(requestId, queryPlan, timeoutMs, 
queryOptions);
+      // run reduce stage and return result.
+      return runReducer(requestId, queryPlan, reduceStageId, timeoutMs, 
mailboxService, executionStatsAggregator);
+    } catch (Exception e) {
+      cancel(requestId, queryPlan);
+      throw new RuntimeException("Error executing query: " + 
ExplainPlanStageVisitor.explain(queryPlan), e);
+    }
   }
 
-  public int submit(long requestId, QueryPlan queryPlan, long timeoutMs, 
Map<String, String> queryOptions)
+  private void cancel(long requestId, QueryPlan queryPlan) {
+    Set<DispatchClient> dispatchClientSet = new HashSet<>();
+    for (Map.Entry<Integer, StageMetadata> stage : 
queryPlan.getStageMetadataMap().entrySet()) {
+      int stageId = stage.getKey();
+      // stage rooting at a mailbox receive node means reduce stage.
+      if (!(queryPlan.getQueryStageMap().get(stageId) instanceof 
MailboxReceiveNode)) {
+        List<VirtualServer> serverInstances = 
stage.getValue().getServerInstances();
+        for (VirtualServer serverInstance : serverInstances) {
+          String host = serverInstance.getHostname();
+          int servicePort = serverInstance.getQueryServicePort();
+          dispatchClientSet.add(getOrCreateDispatchClient(host, servicePort));

Review Comment:
   dispatch client is cached anyways so do we need the set?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -38,15 +40,28 @@
 @SuppressWarnings("UnstableApiUsage")
 public class OpChainSchedulerService extends AbstractExecutionThreadService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  // Default time scheduler is allowed to wait for a runnable OpChain to be 
available
+  /**
+   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
+   */
   private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
+  /**
+   * Default cancel signal retention, this should be set to several times 
larger than
+   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
+   */
+  private static final long DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS 
= 60_000L;
 
   private final OpChainScheduler _scheduler;
   private final ExecutorService _workerPool;
+  private final Cache<Long, Long> _cancelledRequests;
 
   public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
+    this(scheduler, workerPool, 
DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS);
+  }
+
+  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool, long releaseTimeoutMs) {

Review Comment:
   Can we rename `releaseTimeoutMs` to the appropriate name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to