ankitsultana commented on code in PR #10401: URL: https://github.com/apache/pinot/pull/10401#discussion_r1132660323
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -135,7 +135,8 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana _queryRunnerExecutorService = Executors.newFixedThreadPool( ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new NamedThreadFactory("query_runner_on_" + _port + "_port")); - _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), _queryWorkerExecutorService); + _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), _queryWorkerExecutorService, + releaseMs); Review Comment: I think you may want to call the two args constructor here? Otherwise the cancellation ttl is same as releaseMs ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java: ########## @@ -81,21 +84,38 @@ public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan, MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator) throws Exception { - // submit all the distributed stages. - int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions); - // run reduce stage and return result. - MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId); - MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService, - queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), requestId, - reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), - new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs); - List<DataBlock> resultDataBlocks = - reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, executionStatsAggregator, queryPlan); - return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(), - queryPlan.getQueryStageMap().get(0).getDataSchema()); + try { + // submit all the distributed stages. + int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions); + // run reduce stage and return result. + return runReducer(requestId, queryPlan, reduceStageId, timeoutMs, mailboxService, executionStatsAggregator); + } catch (Exception e) { + cancel(requestId, queryPlan); + throw new RuntimeException("Error executing query: " + ExplainPlanStageVisitor.explain(queryPlan), e); + } } - public int submit(long requestId, QueryPlan queryPlan, long timeoutMs, Map<String, String> queryOptions) + private void cancel(long requestId, QueryPlan queryPlan) { + Set<DispatchClient> dispatchClientSet = new HashSet<>(); + for (Map.Entry<Integer, StageMetadata> stage : queryPlan.getStageMetadataMap().entrySet()) { + int stageId = stage.getKey(); + // stage rooting at a mailbox receive node means reduce stage. + if (!(queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode)) { + List<VirtualServer> serverInstances = stage.getValue().getServerInstances(); + for (VirtualServer serverInstance : serverInstances) { + String host = serverInstance.getHostname(); + int servicePort = serverInstance.getQueryServicePort(); + dispatchClientSet.add(getOrCreateDispatchClient(host, servicePort)); Review Comment: dispatch client is cached anyways so do we need the set? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java: ########## @@ -38,15 +40,28 @@ @SuppressWarnings("UnstableApiUsage") public class OpChainSchedulerService extends AbstractExecutionThreadService { private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class); - // Default time scheduler is allowed to wait for a runnable OpChain to be available + /** + * Default time scheduler is allowed to wait for a runnable OpChain to be available. + */ private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100; + /** + * Default cancel signal retention, this should be set to several times larger than + * {@link org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}. + */ + private static final long DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 60_000L; private final OpChainScheduler _scheduler; private final ExecutorService _workerPool; + private final Cache<Long, Long> _cancelledRequests; public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) { + this(scheduler, workerPool, DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS); + } + + public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool, long releaseTimeoutMs) { Review Comment: Can we rename `releaseTimeoutMs` to the appropriate name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org