This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2c48780500 Introduce MSE active and passive timeouts (#16075) 2c48780500 is described below commit 2c48780500613be076d97c7a8f64c546c1ef3509 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Wed Jul 2 11:29:05 2025 +0200 Introduce MSE active and passive timeouts (#16075) --- .../MultiStageBrokerRequestHandler.java | 16 +++- .../common/utils/config/QueryOptionsUtils.java | 6 ++ .../apache/pinot/query/runtime/QueryRunner.java | 31 +++---- .../operator/BaseMailboxReceiveOperator.java | 9 ++ .../pinot/query/runtime/operator/LeafOperator.java | 13 ++- .../runtime/operator/MailboxSendOperator.java | 10 +- .../query/runtime/operator/MultiStageOperator.java | 25 ++++- .../utils/BlockingMultiStreamConsumer.java | 2 +- .../runtime/plan/OpChainExecutionContext.java | 60 +++++++++++- .../plan/pipeline/PipelineBreakerExecutor.java | 31 ++++++- .../plan/server/ServerPlanRequestUtils.java | 4 +- .../query/service/dispatch/QueryDispatcher.java | 37 ++++++-- .../plan/pipeline/PipelineBreakerExecutorTest.java | 2 +- .../query/runtime/queries/QueryRunnerTestBase.java | 7 +- .../apache/pinot/spi/query/QueryThreadContext.java | 103 +++++++++++++++++---- .../apache/pinot/spi/utils/CommonConstants.java | 3 + 16 files changed, 298 insertions(+), 61 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index bd42a81d77..78513ef01c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -128,6 +128,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { private final boolean _explainAskingServerDefault; private final MultiStageQueryThrottler _queryThrottler; private final ExecutorService _queryCompileExecutor; + protected final long _extraPassiveTimeoutMs; public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, @@ -141,6 +142,10 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? TlsUtils.extractTlsConfig(config, CommonConstants.Broker.BROKER_TLS_PREFIX) : null; + _extraPassiveTimeoutMs = config.getProperty( + CommonConstants.Broker.CONFIG_OF_EXTRA_PASSIVE_TIMEOUT_MS, + CommonConstants.Broker.DEFAULT_EXTRA_PASSIVE_TIMEOUT_MS); + failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer); long cancelMillis = config.getProperty( CommonConstants.MultiStageQueryRunner.KEY_OF_CANCEL_TIMEOUT_MS, @@ -295,7 +300,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { throws QueryException, WebApplicationException { _queryLogger.log(requestId, query); - long queryTimeoutMs = getTimeout(sqlNodeAndOptions.getOptions()); + Map<String, String> options = sqlNodeAndOptions.getOptions(); + long queryTimeoutMs = getTimeout(options); + QueryThreadContext.setActiveDeadlineMs(System.currentTimeMillis() + queryTimeoutMs); + QueryThreadContext.setPassiveDeadlineMs(System.currentTimeMillis() + queryTimeoutMs + getPassiveTimeout(options)); + Timer queryTimer = new Timer(queryTimeoutMs, TimeUnit.MILLISECONDS); try (QueryEnvironment.CompiledQuery compiledQuery = @@ -421,6 +430,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs; } + private long getPassiveTimeout(Map<String, String> queryOptions) { + Long passiveTimeoutMsFromQueryOption = QueryOptionsUtils.getPassiveTimeoutMs(queryOptions); + return passiveTimeoutMsFromQueryOption != null ? passiveTimeoutMsFromQueryOption : _extraPassiveTimeoutMs; + } + /** * Explains the query and returns the broker response. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 1252b29cee..63523f0af4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -112,6 +112,12 @@ public class QueryOptionsUtils { return checkedParseLongPositive(QueryOptionKey.TIMEOUT_MS, timeoutMsString); } + @Nullable + public static Long getPassiveTimeoutMs(Map<String, String> queryOptions) { + String passiveTimeoutMsString = queryOptions.get(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS); + return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, passiveTimeoutMsString, 0); + } + @Nullable public static Long getMaxServerResponseSizeBytes(Map<String, String> queryOptions) { String responseSize = queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 1706187d4a..d98f919f29 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -260,30 +260,25 @@ public class QueryRunner { MseWorkerThreadContext.setStageId(stagePlan.getStageMetadata().getStageId()); MseWorkerThreadContext.setWorkerId(workerMetadata.getWorkerId()); - long requestId = Long.parseLong(requestMetadata.get(MetadataKeys.REQUEST_ID)); - long timeoutMs = Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS)); - long deadlineMs = System.currentTimeMillis() + timeoutMs; - StageMetadata stageMetadata = stagePlan.getStageMetadata(); Map<String, String> opChainMetadata = consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata); // run pre-stage execution for all pipeline breakers PipelineBreakerResult pipelineBreakerResult = - PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, workerMetadata, stagePlan, - opChainMetadata, requestId, deadlineMs, parentContext, _sendStats.getAsBoolean()); + PipelineBreakerExecutor.executePipelineBreakersFromQueryContext(_opChainScheduler, _mailboxService, + workerMetadata, stagePlan, opChainMetadata, parentContext, _sendStats.getAsBoolean()); // Send error block to all the receivers if pipeline breaker fails if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) { ErrorMseBlock errorBlock = pipelineBreakerResult.getErrorBlock(); - notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, requestId, workerMetadata, stagePlan, - deadlineMs); + notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, workerMetadata, stagePlan); return; } // run OpChain - OpChainExecutionContext executionContext = - new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, opChainMetadata, stageMetadata, - workerMetadata, pipelineBreakerResult, parentContext, _sendStats.getAsBoolean()); + OpChainExecutionContext executionContext = OpChainExecutionContext.fromQueryContext(_mailboxService, + opChainMetadata, stageMetadata, workerMetadata, pipelineBreakerResult, parentContext, + _sendStats.getAsBoolean()); OpChain opChain; if (workerMetadata.isLeafStageWorker()) { Map<String, String> rlsFilters = RlsUtils.extractRlsFilters(requestMetadata); @@ -298,13 +293,13 @@ public class QueryRunner { _opChainScheduler.register(opChain); } catch (RuntimeException e) { ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e); - notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, requestId, workerMetadata, stagePlan, - deadlineMs); + notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, workerMetadata, stagePlan); } } - private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock errorBlock, long requestId, - WorkerMetadata workerMetadata, StagePlan stagePlan, long deadlineMs) { + private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock errorBlock, + WorkerMetadata workerMetadata, StagePlan stagePlan) { + long requestId = QueryThreadContext.getRequestId(); LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId, stageId, errorBlock); MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode(); @@ -317,6 +312,7 @@ public class QueryRunner { receiverMailboxInfos); routingInfos.addAll(stageRoutingInfos); } + long deadlineMs = QueryThreadContext.getPassiveDeadlineMs(); for (RoutingInfo routingInfo : routingInfos) { try { StatMap<MailboxSendOperator.StatKey> statMap = new StatMap<>(MailboxSendOperator.StatKey.class); @@ -525,9 +521,8 @@ public class QueryRunner { } }; // compile OpChain - OpChainExecutionContext executionContext = - new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, opChainMetadata, stageMetadata, - workerMetadata, null, null, false); + OpChainExecutionContext executionContext = OpChainExecutionContext.fromQueryContext(_mailboxService, + opChainMetadata, stageMetadata, workerMetadata, null, null, false); OpChain opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, _leafQueryExecutor, _executorService, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java index 8c2001b3f9..71bee7bfa4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java @@ -131,6 +131,15 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator { } } + @Override + protected void sampleAndCheckInterruption() { + // mailbox receive operator uses passive deadline instead of the active one because it is not an active operator + // as it just waits for data from the mailbox. + // This way if timeout is reached, it will be less probable to hit the timeout here, on the stage waiting for data, + // than in the operator that is actively processing the data, which will produce a more meaningful error message. + sampleAndCheckInterruption(_context.getPassiveDeadlineMs()); + } + @Override public void registerExecution(long time, int numRows) { _statMap.merge(StatKey.EXECUTION_TIME_MS, time); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java index 3491f5c43b..20c96fdcf1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java @@ -153,8 +153,10 @@ public class LeafOperator extends MultiStageOperator { if (_isEarlyTerminated) { return SuccessMseBlock.INSTANCE; } + // Here we use passive deadline because we end up waiting for the SSE operators + // which can timeout by their own BaseResultsBlock resultsBlock = - _blockingQueue.poll(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + _blockingQueue.poll(_context.getPassiveDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (resultsBlock == null) { throw new TimeoutException("Timed out waiting for results block"); } @@ -321,7 +323,8 @@ public class LeafOperator extends MultiStageOperator { while (true) { BaseResultsBlock resultsBlock; try { - long timeout = _context.getDeadlineMs() - System.currentTimeMillis(); + // Here we could use active or passive, given we don't actually execute anything + long timeout = _context.getPassiveDeadlineMs() - System.currentTimeMillis(); resultsBlock = _blockingQueue.poll(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -439,13 +442,13 @@ public class LeafOperator extends MultiStageOperator { }); } try { - if (!latch.await(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { + if (!latch.await(_context.getPassiveDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for leaf stage to finish"); } // Propagate the exception thrown by the leaf stage for (Future<Map<String, String>> future : futures) { Map<String, String> stats = - future.get(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + future.get(_context.getPassiveDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); mergeExecutionStats(stats); } } catch (TimeoutException e) { @@ -467,7 +470,7 @@ public class LeafOperator extends MultiStageOperator { @VisibleForTesting void addResultsBlock(BaseResultsBlock resultsBlock) throws InterruptedException, TimeoutException { - if (!_blockingQueue.offer(resultsBlock, _context.getDeadlineMs() - System.currentTimeMillis(), + if (!_blockingQueue.offer(resultsBlock, _context.getPassiveDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting to add results block"); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index d48ba0469c..6c223e861f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -159,7 +159,9 @@ public class MailboxSendOperator extends MultiStageOperator { distributionType); MailboxService mailboxService = context.getMailboxService(); long requestId = context.getRequestId(); - long deadlineMs = context.getDeadlineMs(); + // It is important to use passive deadline here, otherwise the GRPC channel could be closed before + // the useful error block is sent + long deadlineMs = context.getPassiveDeadlineMs(); List<MailboxInfo> mailboxInfos = context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); @@ -230,6 +232,12 @@ public class MailboxSendOperator extends MultiStageOperator { } } + @Override + protected void sampleAndCheckInterruption() { + // mailbox send operator uses passive deadline instead of the active one + sampleAndCheckInterruption(_context.getPassiveDeadlineMs()); + } + private void sendEos(MseBlock.Eos eosBlockWithoutStats) throws Exception { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index edc6de4160..d9a285d342 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -73,9 +73,30 @@ public abstract class MultiStageOperator public abstract void registerExecution(long time, int numRows); - // Samples resource usage of the operator. The operator should call this function for every block of data or - // assuming the block holds 10000 rows or more. + /// This method should be called periodically by the operator to check whether the execution should be interrupted. + /// + /// This could happen when the request deadline is reached, or the thread accountant decides to interrupt the query + /// due to resource constraints. + /// + /// Normally, callers should call [#sampleAndCheckInterruption(long deadlineMs)] passing the correct deadline, but + /// given most operators use either the active or the passive deadline, this method is provided as a convenience + /// method. By default, it uses the active deadline, which is the one that should be used for most operators, but + /// if the operator does not actively process data (ie both mailbox operators), it should override this method to + /// use the passive deadline instead. + /// See for example [MailboxSendOperator][org.apache.pinot.query.runtime.operator.MailboxSendOperator]). protected void sampleAndCheckInterruption() { + sampleAndCheckInterruption(_context.getActiveDeadlineMs()); + } + + /// This method should be called periodically by the operator to check whether the execution should be interrupted. + /// + /// This could happen when the request deadline is reached, or the thread accountant decides to interrupt the query + /// due to resource constraints. + protected void sampleAndCheckInterruption(long deadlineMs) { + if (System.currentTimeMillis() >= deadlineMs) { + earlyTerminate(); + throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on " + getExplainName()); + } Tracing.ThreadAccountantOps.sampleMSE(); if (Tracing.ThreadAccountantOps.isInterrupted()) { earlyTerminate(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java index 3f6fc1192d..68e5784bc1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java @@ -344,7 +344,7 @@ public abstract class BlockingMultiStreamConsumer<E> implements AutoCloseable { public OfMseBlock(OpChainExecutionContext context, List<? extends AsyncStream<ReceivingMailbox.MseBlockWithStats>> asyncProducers) { - super(context.getId(), context.getDeadlineMs(), asyncProducers); + super(context.getId(), context.getPassiveDeadlineMs(), asyncProducers); _stageId = context.getStageId(); _stats = MultiStageQueryStats.emptyStats(context.getStageId()); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index af627f4e28..0c95edbfdd 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -29,6 +29,7 @@ import org.apache.pinot.query.runtime.operator.OpChainId; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext; import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.utils.CommonConstants; @@ -41,7 +42,8 @@ public class OpChainExecutionContext { private final MailboxService _mailboxService; private final long _requestId; - private final long _deadlineMs; + private final long _activeDeadlineMs; + private final long _passiveDeadlineMs; private final Map<String, String> _opChainMetadata; private final StageMetadata _stageMetadata; private final WorkerMetadata _workerMetadata; @@ -56,14 +58,24 @@ public class OpChainExecutionContext { private ServerPlanRequestContext _leafStageContext; private final boolean _sendStats; - + @Deprecated public OpChainExecutionContext(MailboxService mailboxService, long requestId, long deadlineMs, Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata, @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable ThreadExecutionContext parentContext, boolean sendStats) { + this(mailboxService, requestId, deadlineMs, deadlineMs, opChainMetadata, stageMetadata, workerMetadata, + pipelineBreakerResult, parentContext, sendStats); + } + + public OpChainExecutionContext(MailboxService mailboxService, long requestId, + long activeDeadlineMs, long passiveDeadlineMs, + Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata, + @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable ThreadExecutionContext parentContext, + boolean sendStats) { _mailboxService = mailboxService; _requestId = requestId; - _deadlineMs = deadlineMs; + _activeDeadlineMs = activeDeadlineMs; + _passiveDeadlineMs = passiveDeadlineMs; _opChainMetadata = Collections.unmodifiableMap(opChainMetadata); _stageMetadata = stageMetadata; _workerMetadata = workerMetadata; @@ -76,6 +88,17 @@ public class OpChainExecutionContext { _parentContext = parentContext; } + public static OpChainExecutionContext fromQueryContext(MailboxService mailboxService, + Map<String, String> opChainMetadata, StageMetadata stageMetadata, WorkerMetadata workerMetadata, + @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable ThreadExecutionContext parentContext, + boolean sendStats) { + long requestId = QueryThreadContext.getRequestId(); + long activeDeadlineMs = QueryThreadContext.getActiveDeadlineMs(); + long passiveDeadlineMs = QueryThreadContext.getPassiveDeadlineMs(); + return new OpChainExecutionContext(mailboxService, requestId, activeDeadlineMs, passiveDeadlineMs, + opChainMetadata, stageMetadata, workerMetadata, pipelineBreakerResult, parentContext, sendStats); + } + public MailboxService getMailboxService() { return _mailboxService; } @@ -96,8 +119,37 @@ public class OpChainExecutionContext { return _server; } + /// Returns the deadline in milliseconds for the OpChain to complete when it is actively waiting for data. + /// + /// This deadline should only be used for _active_ waits, like when a + /// [HashJoinOperator][org.apache.pinot.query.runtime.operator.HashJoinOperator] is building the hash table. + /// + /// This should not be used for _passive_ waits, like when the a + /// [MailboxReceiveOperator][org.apache.pinot.query.runtime.operator.MailboxReceiveOperator] or a + /// [PipelineBreakerOperator][org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator] passively waits + /// for data to arrive. + public long getActiveDeadlineMs() { + return _activeDeadlineMs; + } + + /// For backward compatibility, we return the active deadline as the default. + /// This should be used for active waits only. + /// @deprecated Use {@link #getActiveDeadlineMs()} instead. public long getDeadlineMs() { - return _deadlineMs; + return getActiveDeadlineMs(); + } + + /// Returns the deadline in milliseconds for the OpChain to complete when it is passively waiting for data. + /// + /// This deadline should only be used for _passive_ waits, like when the + /// [MailboxReceiveOperator][org.apache.pinot.query.runtime.operator.MailboxReceiveOperator] or a + /// [PipelineBreakerOperator][org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator] + /// passively waits for data to arrive. + /// + /// This should not be used for _active_ waits, like when the a + /// [HashJoinOperator][org.apache.pinot.query.runtime.operator.HashJoinOperator] is building the hash table. + public long getPassiveDeadlineMs() { + return _passiveDeadlineMs; } public Map<String, String> getOpChainMetadata() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java index dffd7f6ba2..27f375e4db 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java @@ -38,6 +38,7 @@ import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain; import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.query.QueryThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,30 @@ public class PipelineBreakerExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(PipelineBreakerExecutor.class); + /** + * Execute a pipeline breaker and collect the results (synchronously). Currently, pipeline breaker executor can only + * execute mailbox receive pipeline breaker. + * + * @param scheduler scheduler service to run the pipeline breaker main thread. + * @param mailboxService mailbox service to attach the {@link MailboxReceiveNode} against. + * @param workerMetadata worker metadata for the current worker. + * @param stagePlan the distributed stage plan to run pipeline breaker on. + * @param opChainMetadata request metadata, including query options + * @param parentContext Parent thread metadata + * @return pipeline breaker result; + * - If exception occurs, exception block will be wrapped in {@link MseBlock} and assigned to each PB node. + * - Normal stats will be attached to each PB node and downstream execution should return with stats attached. + */ + @Nullable + public static PipelineBreakerResult executePipelineBreakersFromQueryContext(OpChainSchedulerService scheduler, + MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan, + Map<String, String> opChainMetadata, + @Nullable ThreadExecutionContext parentContext, boolean sendStats) { + return executePipelineBreakers(scheduler, mailboxService, workerMetadata, stagePlan, opChainMetadata, + QueryThreadContext.getRequestId(), QueryThreadContext.getActiveDeadlineMs(), + QueryThreadContext.getPassiveDeadlineMs(), parentContext, sendStats); + } + /** * Execute a pipeline breaker and collect the results (synchronously). Currently, pipeline breaker executor can only * execute mailbox receive pipeline breaker. @@ -68,7 +93,7 @@ public class PipelineBreakerExecutor { @Nullable public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan, - Map<String, String> opChainMetadata, long requestId, long deadlineMs, + Map<String, String> opChainMetadata, long requestId, long activeDeadlineMs, long passiveDeadlineMs, @Nullable ThreadExecutionContext parentContext, boolean sendStats) { PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(); PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(), pipelineBreakerContext); @@ -78,7 +103,7 @@ public class PipelineBreakerExecutor { // OpChain receive-mail callbacks. // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information OpChainExecutionContext opChainExecutionContext = - new OpChainExecutionContext(mailboxService, requestId, deadlineMs, opChainMetadata, + new OpChainExecutionContext(mailboxService, requestId, activeDeadlineMs, passiveDeadlineMs, opChainMetadata, stagePlan.getStageMetadata(), workerMetadata, null, parentContext, sendStats); return execute(scheduler, pipelineBreakerContext, opChainExecutionContext); } catch (Exception e) { @@ -125,7 +150,7 @@ public class PipelineBreakerExecutor { OpChain pipelineBreakerOpChain = new OpChain(opChainExecutionContext, pipelineBreakerOperator, (id) -> latch.countDown()); scheduler.register(pipelineBreakerOpChain); - long timeoutMs = opChainExecutionContext.getDeadlineMs() - System.currentTimeMillis(); + long timeoutMs = opChainExecutionContext.getPassiveDeadlineMs() - System.currentTimeMillis(); if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) { return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), pipelineBreakerOperator.getResultMap(), pipelineBreakerOperator.getErrorBlock(), pipelineBreakerOperator.calculateStats()); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index 6e5d1aac01..572c4c55d7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -294,7 +294,9 @@ public class ServerPlanRequestUtils { pinotQuery.setQueryOptions(queryOptions); } queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, - Long.toString(executionContext.getDeadlineMs() - System.currentTimeMillis())); + Long.toString(executionContext.getActiveDeadlineMs() - System.currentTimeMillis())); + queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, + Long.toString(executionContext.getPassiveDeadlineMs() - executionContext.getActiveDeadlineMs())); } /** diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 68cac7e9ce..6486db9228 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -168,7 +168,7 @@ public class QueryDispatcher { boolean cancelled = false; try { submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions); - QueryResult result = runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions, _mailboxService); + QueryResult result = runReducer(dispatchableSubPlan, queryOptions, _mailboxService); if (result.getProcessingException() != null) { MultiStageQueryStats statsFromCancel = cancelWithStats(requestId, servers); cancelled = true; @@ -452,6 +452,8 @@ public class QueryDispatcher { requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID, cid); requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS))); + requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, + Long.toString(QueryThreadContext.getPassiveDeadlineMs())); requestMetadata.putAll(queryOptions); return requestMetadata; } @@ -577,15 +579,38 @@ public class QueryDispatcher { return _timeSeriesDispatchClientMap.computeIfAbsent(key, k -> new TimeSeriesDispatchClient(hostname, port)); } - // There is no reduction happening here, results are simply concatenated. + /// Concatenates the results of the sub-plan and returns a [QueryResult] with the concatenated result. + /// + /// This method assumes the caller thread is a query thread and therefore [QueryThreadContext] has been initialized. + private static QueryResult runReducer( + DispatchableSubPlan subPlan, + Map<String, String> queryOptions, + MailboxService mailboxService + ) { + return runReducer( + QueryThreadContext.getRequestId(), + subPlan, + QueryThreadContext.getActiveDeadlineMs(), + QueryThreadContext.getPassiveDeadlineMs(), + queryOptions, + mailboxService + ); + } + + /// Concatenates the results of the sub-plan and returns a [QueryResult] with the concatenated result. + /// + /// This method should be called from a query thread and therefore using + /// [#runReducer(DispatchableSubPlan, Map, MailboxService)] is preferred. + /// + /// Remember that in MSE there is no actual reduce but rather a single stage that concatenates the results. @VisibleForTesting public static QueryResult runReducer(long requestId, DispatchableSubPlan subPlan, - long timeoutMs, + long activeDeadlineMs, + long passiveDeadlineMs, Map<String, String> queryOptions, MailboxService mailboxService) { long startTimeMs = System.currentTimeMillis(); - long deadlineMs = startTimeMs + timeoutMs; // NOTE: Reduce stage is always stage 0 DispatchablePlanFragment stagePlan = subPlan.getQueryStageMap().get(0); PlanFragment planFragment = stagePlan.getPlanFragment(); @@ -597,8 +622,8 @@ public class QueryDispatcher { StageMetadata stageMetadata = new StageMetadata(0, workerMetadata, stagePlan.getCustomProperties()); ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); OpChainExecutionContext executionContext = - new OpChainExecutionContext(mailboxService, requestId, deadlineMs, queryOptions, stageMetadata, - workerMetadata.get(0), null, parentContext, true); + new OpChainExecutionContext(mailboxService, requestId, activeDeadlineMs, passiveDeadlineMs, + queryOptions, stageMetadata, workerMetadata.get(0), null, parentContext, true); PairList<Integer, String> resultFields = subPlan.getQueryResultFields(); DataSchema sourceSchema = rootNode.getDataSchema(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java index fa4689d26a..4f25431751 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java @@ -108,7 +108,7 @@ public class PipelineBreakerExecutorTest { MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> opChainMetadata, long requestId, long deadlineMs) { return PipelineBreakerExecutor.executePipelineBreakers(scheduler, mailboxService, workerMetadata, stagePlan, - opChainMetadata, requestId, deadlineMs, null, true); + opChainMetadata, requestId, deadlineMs, deadlineMs, null, true); } @AfterClass diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java index 89371ec0a3..ff34cc3ef8 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java @@ -160,7 +160,12 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { } } // exception will be propagated through for assert purpose on runtime error - return QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, Collections.emptyMap(), + long now = System.currentTimeMillis(); + long extraPassiveTimeout = 1000; + return QueryDispatcher.runReducer(requestId, dispatchableSubPlan, + timeoutMs + now, + timeoutMs + now + extraPassiveTimeout, + Collections.emptyMap(), _mailboxService); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java index a4d5174efe..dbfc6ece96 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java @@ -181,9 +181,12 @@ public class QueryThreadContext { } QueryThreadContext.setIds(requestId, cid); long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)); + long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault( + CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0")); long startTimeMs = System.currentTimeMillis(); QueryThreadContext.setStartTimeMs(startTimeMs); - QueryThreadContext.setDeadlineMs(startTimeMs + timeoutMs); + QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs); + QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs + extraPassiveTimeoutMs); return open; } @@ -217,7 +220,8 @@ public class QueryThreadContext { Instance context = new Instance(); if (memento != null) { context.setStartTimeMs(memento._startTimeMs); - context.setDeadlineMs(memento._deadlineMs); + context.setActiveDeadlineMs(memento._activeDeadlineMs); + context.setPassiveDeadlineMs(memento._passiveDeadlineMs); context.setBrokerId(memento._brokerId); context.setRequestId(memento._requestId); context.setCid(memento._cid); @@ -295,23 +299,60 @@ public class QueryThreadContext { } /** - * Returns the deadline time of the query in milliseconds since epoch. + * Use {@link #getActiveDeadlineMs()} instead. + */ + @Deprecated + public static long getDeadlineMs() { + return get().getActiveDeadlineMs(); + } + + /** + * @deprecated Use {@link #setActiveDeadlineMs(long)} instead. + * @throws IllegalStateException if deadline is already set or if the {@link QueryThreadContext} is not initialized + */ + @Deprecated + public static void setDeadlineMs(long deadlineMs) { + get().setActiveDeadlineMs(deadlineMs); + } + + /** + * Returns the active deadline time of the query in milliseconds since epoch. * * The default value of 0 means the deadline is not set. * @throws IllegalStateException if the {@link QueryThreadContext} is not initialized */ - public static long getDeadlineMs() { - return get().getDeadlineMs(); + public static long getActiveDeadlineMs() { + return get().getActiveDeadlineMs(); } /** - * Sets the deadline time of the query in milliseconds since epoch. + * Sets the active deadline time of the query in milliseconds since epoch. * * The deadline can only be set once. * @throws IllegalStateException if deadline is already set or if the {@link QueryThreadContext} is not initialized */ - public static void setDeadlineMs(long deadlineMs) { - get().setDeadlineMs(deadlineMs); + public static void setActiveDeadlineMs(long activeDeadlineMs) { + get().setActiveDeadlineMs(activeDeadlineMs); + } + + /** + * Returns the passive deadline time of the query in milliseconds since epoch. + * + * The default value of 0 means the deadline is not set. + * @throws IllegalStateException if the {@link QueryThreadContext} is not initialized + */ + public static long getPassiveDeadlineMs() { + return get().getPassiveDeadlineMs(); + } + + /** + * Sets the passive deadline time of the query in milliseconds since epoch. + * + * The deadline can only be set once. + * @throws IllegalStateException if deadline is already set or if the {@link QueryThreadContext} is not initialized + */ + public static void setPassiveDeadlineMs(long passiveDeadlineMs) { + get().setPassiveDeadlineMs(passiveDeadlineMs); } /** @@ -456,7 +497,8 @@ public class QueryThreadContext { */ private static class Instance implements CloseableContext { private long _startTimeMs; - private long _deadlineMs; + private long _activeDeadlineMs; + private long _passiveDeadlineMs; private String _brokerId; private long _requestId; private String _cid; @@ -474,14 +516,34 @@ public class QueryThreadContext { _startTimeMs = startTimeMs; } + @Deprecated public long getDeadlineMs() { - return _deadlineMs; + return getActiveDeadlineMs(); + } + + public long getActiveDeadlineMs() { + return _activeDeadlineMs; } + @Deprecated public void setDeadlineMs(long deadlineMs) { - Preconditions.checkState(getDeadlineMs() == 0, "Deadline already set to %s, cannot set again", - getDeadlineMs()); - _deadlineMs = deadlineMs; + setActiveDeadlineMs(deadlineMs); + } + + public void setActiveDeadlineMs(long activeDeadlineMs) { + Preconditions.checkState(getActiveDeadlineMs() == 0, "Deadline already set to %s, cannot set again", + getActiveDeadlineMs()); + _activeDeadlineMs = activeDeadlineMs; + } + + public long getPassiveDeadlineMs() { + return _passiveDeadlineMs; + } + + public void setPassiveDeadlineMs(long passiveDeadlineMs) { + Preconditions.checkState(getPassiveDeadlineMs() == 0, "Passive deadline already set to %s, cannot set again", + getPassiveDeadlineMs()); + _passiveDeadlineMs = passiveDeadlineMs; } public String getBrokerId() { @@ -576,8 +638,13 @@ public class QueryThreadContext { } @Override - public void setDeadlineMs(long deadlineMs) { - LOGGER.debug("Setting deadline to {} in a fake context", deadlineMs); + public void setActiveDeadlineMs(long activeDeadlineMs) { + LOGGER.debug("Setting active deadline to {} in a fake context", activeDeadlineMs); + } + + @Override + public void setPassiveDeadlineMs(long passiveDeadlineMs) { + LOGGER.debug("Setting passive deadline to {} in a fake context", passiveDeadlineMs); } @Override @@ -632,7 +699,8 @@ public class QueryThreadContext { */ public static class Memento { private final long _startTimeMs; - private final long _deadlineMs; + private final long _activeDeadlineMs; + private final long _passiveDeadlineMs; private final String _brokerId; private final long _requestId; private final String _cid; @@ -642,7 +710,8 @@ public class QueryThreadContext { private Memento(Instance instance) { _startTimeMs = instance.getStartTimeMs(); - _deadlineMs = instance.getDeadlineMs(); + _activeDeadlineMs = instance.getActiveDeadlineMs(); + _passiveDeadlineMs = instance.getPassiveDeadlineMs(); _brokerId = instance.getBrokerId(); _requestId = instance.getRequestId(); _cid = instance.getCid(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 2d60123329..3a2b703edc 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -344,6 +344,8 @@ public class CommonConstants { public static final String CONFIG_OF_BROKER_ENABLE_ROW_COLUMN_LEVEL_AUTH = "pinot.broker.enable.row.column.level.auth"; public static final boolean DEFAULT_BROKER_ENABLE_ROW_COLUMN_LEVEL_AUTH = false; + public static final String CONFIG_OF_EXTRA_PASSIVE_TIMEOUT_MS = "pinot.broker.extraPassiveTimeoutMs"; + public static final long DEFAULT_EXTRA_PASSIVE_TIMEOUT_MS = 100L; public static final String CONFIG_OF_BROKER_ID = "pinot.broker.instance.id"; public static final String CONFIG_OF_BROKER_INSTANCE_TAGS = "pinot.broker.instance.tags"; public static final String CONFIG_OF_BROKER_HOSTNAME = "pinot.broker.hostname"; @@ -544,6 +546,7 @@ public class CommonConstants { public static class QueryOptionKey { public static final String TIMEOUT_MS = "timeoutMs"; + public static final String EXTRA_PASSIVE_TIMEOUT_MS = "extraPassiveTimeoutMs"; public static final String SKIP_UPSERT = "skipUpsert"; public static final String SKIP_UPSERT_VIEW = "skipUpsertView"; public static final String UPSERT_VIEW_FRESHNESS_MS = "upsertViewFreshnessMs"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org