This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c48780500 Introduce MSE active and passive timeouts (#16075)
2c48780500 is described below

commit 2c48780500613be076d97c7a8f64c546c1ef3509
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Wed Jul 2 11:29:05 2025 +0200

    Introduce MSE active and passive timeouts (#16075)
---
 .../MultiStageBrokerRequestHandler.java            |  16 +++-
 .../common/utils/config/QueryOptionsUtils.java     |   6 ++
 .../apache/pinot/query/runtime/QueryRunner.java    |  31 +++----
 .../operator/BaseMailboxReceiveOperator.java       |   9 ++
 .../pinot/query/runtime/operator/LeafOperator.java |  13 ++-
 .../runtime/operator/MailboxSendOperator.java      |  10 +-
 .../query/runtime/operator/MultiStageOperator.java |  25 ++++-
 .../utils/BlockingMultiStreamConsumer.java         |   2 +-
 .../runtime/plan/OpChainExecutionContext.java      |  60 +++++++++++-
 .../plan/pipeline/PipelineBreakerExecutor.java     |  31 ++++++-
 .../plan/server/ServerPlanRequestUtils.java        |   4 +-
 .../query/service/dispatch/QueryDispatcher.java    |  37 ++++++--
 .../plan/pipeline/PipelineBreakerExecutorTest.java |   2 +-
 .../query/runtime/queries/QueryRunnerTestBase.java |   7 +-
 .../apache/pinot/spi/query/QueryThreadContext.java | 103 +++++++++++++++++----
 .../apache/pinot/spi/utils/CommonConstants.java    |   3 +
 16 files changed, 298 insertions(+), 61 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index bd42a81d77..78513ef01c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -128,6 +128,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   private final boolean _explainAskingServerDefault;
   private final MultiStageQueryThrottler _queryThrottler;
   private final ExecutorService _queryCompileExecutor;
+  protected final long _extraPassiveTimeoutMs;
 
   public MultiStageBrokerRequestHandler(PinotConfiguration config, String 
brokerId, BrokerRoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
@@ -141,6 +142,10 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? 
TlsUtils.extractTlsConfig(config,
         CommonConstants.Broker.BROKER_TLS_PREFIX) : null;
 
+    _extraPassiveTimeoutMs = config.getProperty(
+        CommonConstants.Broker.CONFIG_OF_EXTRA_PASSIVE_TIMEOUT_MS,
+        CommonConstants.Broker.DEFAULT_EXTRA_PASSIVE_TIMEOUT_MS);
+
     failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer);
     long cancelMillis = config.getProperty(
         CommonConstants.MultiStageQueryRunner.KEY_OF_CANCEL_TIMEOUT_MS,
@@ -295,7 +300,11 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       throws QueryException, WebApplicationException {
     _queryLogger.log(requestId, query);
 
-    long queryTimeoutMs = getTimeout(sqlNodeAndOptions.getOptions());
+    Map<String, String> options = sqlNodeAndOptions.getOptions();
+    long queryTimeoutMs = getTimeout(options);
+    QueryThreadContext.setActiveDeadlineMs(System.currentTimeMillis() + 
queryTimeoutMs);
+    QueryThreadContext.setPassiveDeadlineMs(System.currentTimeMillis() + 
queryTimeoutMs + getPassiveTimeout(options));
+
     Timer queryTimer = new Timer(queryTimeoutMs, TimeUnit.MILLISECONDS);
 
     try (QueryEnvironment.CompiledQuery compiledQuery =
@@ -421,6 +430,11 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : 
_brokerTimeoutMs;
   }
 
+  private long getPassiveTimeout(Map<String, String> queryOptions) {
+    Long passiveTimeoutMsFromQueryOption = 
QueryOptionsUtils.getPassiveTimeoutMs(queryOptions);
+    return passiveTimeoutMsFromQueryOption != null ? 
passiveTimeoutMsFromQueryOption : _extraPassiveTimeoutMs;
+  }
+
 
   /**
    * Explains the query and returns the broker response.
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 1252b29cee..63523f0af4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -112,6 +112,12 @@ public class QueryOptionsUtils {
     return checkedParseLongPositive(QueryOptionKey.TIMEOUT_MS, 
timeoutMsString);
   }
 
+  @Nullable
+  public static Long getPassiveTimeoutMs(Map<String, String> queryOptions) {
+    String passiveTimeoutMsString = 
queryOptions.get(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS);
+    return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, 
passiveTimeoutMsString, 0);
+  }
+
   @Nullable
   public static Long getMaxServerResponseSizeBytes(Map<String, String> 
queryOptions) {
     String responseSize = 
queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1706187d4a..d98f919f29 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -260,30 +260,25 @@ public class QueryRunner {
     
MseWorkerThreadContext.setStageId(stagePlan.getStageMetadata().getStageId());
     MseWorkerThreadContext.setWorkerId(workerMetadata.getWorkerId());
 
-    long requestId = 
Long.parseLong(requestMetadata.get(MetadataKeys.REQUEST_ID));
-    long timeoutMs = 
Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));
-    long deadlineMs = System.currentTimeMillis() + timeoutMs;
-
     StageMetadata stageMetadata = stagePlan.getStageMetadata();
     Map<String, String> opChainMetadata = 
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
 
     // run pre-stage execution for all pipeline breakers
     PipelineBreakerResult pipelineBreakerResult =
-        PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, 
_mailboxService, workerMetadata, stagePlan,
-            opChainMetadata, requestId, deadlineMs, parentContext, 
_sendStats.getAsBoolean());
+        
PipelineBreakerExecutor.executePipelineBreakersFromQueryContext(_opChainScheduler,
 _mailboxService,
+            workerMetadata, stagePlan, opChainMetadata, parentContext, 
_sendStats.getAsBoolean());
 
     // Send error block to all the receivers if pipeline breaker fails
     if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() 
!= null) {
       ErrorMseBlock errorBlock = pipelineBreakerResult.getErrorBlock();
-      notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, 
requestId, workerMetadata, stagePlan,
-          deadlineMs);
+      notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, 
workerMetadata, stagePlan);
       return;
     }
 
     // run OpChain
-    OpChainExecutionContext executionContext =
-        new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, 
opChainMetadata, stageMetadata,
-            workerMetadata, pipelineBreakerResult, parentContext, 
_sendStats.getAsBoolean());
+    OpChainExecutionContext executionContext = 
OpChainExecutionContext.fromQueryContext(_mailboxService,
+        opChainMetadata, stageMetadata, workerMetadata, pipelineBreakerResult, 
parentContext,
+        _sendStats.getAsBoolean());
     OpChain opChain;
     if (workerMetadata.isLeafStageWorker()) {
       Map<String, String> rlsFilters = 
RlsUtils.extractRlsFilters(requestMetadata);
@@ -298,13 +293,13 @@ public class QueryRunner {
       _opChainScheduler.register(opChain);
     } catch (RuntimeException e) {
       ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
-      notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, 
requestId, workerMetadata, stagePlan,
-          deadlineMs);
+      notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, 
workerMetadata, stagePlan);
     }
   }
 
-  private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock 
errorBlock, long requestId,
-      WorkerMetadata workerMetadata, StagePlan stagePlan, long deadlineMs) {
+  private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock 
errorBlock,
+      WorkerMetadata workerMetadata, StagePlan stagePlan) {
+    long requestId = QueryThreadContext.getRequestId();
     LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, 
sending error block: {}", requestId,
         stageId, errorBlock);
     MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode();
@@ -317,6 +312,7 @@ public class QueryRunner {
               receiverMailboxInfos);
       routingInfos.addAll(stageRoutingInfos);
     }
+    long deadlineMs = QueryThreadContext.getPassiveDeadlineMs();
     for (RoutingInfo routingInfo : routingInfos) {
       try {
         StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
@@ -525,9 +521,8 @@ public class QueryRunner {
       }
     };
     // compile OpChain
-    OpChainExecutionContext executionContext =
-        new OpChainExecutionContext(_mailboxService, requestId, deadlineMs, 
opChainMetadata, stageMetadata,
-            workerMetadata, null, null, false);
+    OpChainExecutionContext executionContext = 
OpChainExecutionContext.fromQueryContext(_mailboxService,
+        opChainMetadata, stageMetadata, workerMetadata, null, null, false);
 
     OpChain opChain =
         ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, 
_leafQueryExecutor, _executorService,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 8c2001b3f9..71bee7bfa4 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -131,6 +131,15 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
     }
   }
 
+  @Override
+  protected void sampleAndCheckInterruption() {
+    // mailbox receive operator uses passive deadline instead of the active 
one because it is not an active operator
+    // as it just waits for data from the mailbox.
+    // This way if timeout is reached, it will be less probable to hit the 
timeout here, on the stage waiting for data,
+    // than in the operator that is actively processing the data, which will 
produce a more meaningful error message.
+    sampleAndCheckInterruption(_context.getPassiveDeadlineMs());
+  }
+
   @Override
   public void registerExecution(long time, int numRows) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 3491f5c43b..20c96fdcf1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -153,8 +153,10 @@ public class LeafOperator extends MultiStageOperator {
     if (_isEarlyTerminated) {
       return SuccessMseBlock.INSTANCE;
     }
+    // Here we use passive deadline because we end up waiting for the SSE 
operators
+    // which can timeout by their own
     BaseResultsBlock resultsBlock =
-        _blockingQueue.poll(_context.getDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        _blockingQueue.poll(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
     if (resultsBlock == null) {
       throw new TimeoutException("Timed out waiting for results block");
     }
@@ -321,7 +323,8 @@ public class LeafOperator extends MultiStageOperator {
     while (true) {
       BaseResultsBlock resultsBlock;
       try {
-        long timeout = _context.getDeadlineMs() - System.currentTimeMillis();
+        // Here we could use active or passive, given we don't actually 
execute anything
+        long timeout = _context.getPassiveDeadlineMs() - 
System.currentTimeMillis();
         resultsBlock = _blockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -439,13 +442,13 @@ public class LeafOperator extends MultiStageOperator {
             });
           }
           try {
-            if (!latch.await(_context.getDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
+            if (!latch.await(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
               throw new TimeoutException("Timed out waiting for leaf stage to 
finish");
             }
             // Propagate the exception thrown by the leaf stage
             for (Future<Map<String, String>> future : futures) {
               Map<String, String> stats =
-                  future.get(_context.getDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+                  future.get(_context.getPassiveDeadlineMs() - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
               mergeExecutionStats(stats);
             }
           } catch (TimeoutException e) {
@@ -467,7 +470,7 @@ public class LeafOperator extends MultiStageOperator {
   @VisibleForTesting
   void addResultsBlock(BaseResultsBlock resultsBlock)
       throws InterruptedException, TimeoutException {
-    if (!_blockingQueue.offer(resultsBlock, _context.getDeadlineMs() - 
System.currentTimeMillis(),
+    if (!_blockingQueue.offer(resultsBlock, _context.getPassiveDeadlineMs() - 
System.currentTimeMillis(),
         TimeUnit.MILLISECONDS)) {
       throw new TimeoutException("Timed out waiting to add results block");
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index d48ba0469c..6c223e861f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -159,7 +159,9 @@ public class MailboxSendOperator extends MultiStageOperator 
{
         distributionType);
     MailboxService mailboxService = context.getMailboxService();
     long requestId = context.getRequestId();
-    long deadlineMs = context.getDeadlineMs();
+    // It is important to use passive deadline here, otherwise the GRPC 
channel could be closed before
+    // the useful error block is sent
+    long deadlineMs = context.getPassiveDeadlineMs();
 
     List<MailboxInfo> mailboxInfos =
         
context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
@@ -230,6 +232,12 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     }
   }
 
+  @Override
+  protected void sampleAndCheckInterruption() {
+    // mailbox send operator uses passive deadline instead of the active one
+    sampleAndCheckInterruption(_context.getPassiveDeadlineMs());
+  }
+
   private void sendEos(MseBlock.Eos eosBlockWithoutStats)
       throws Exception {
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index edc6de4160..d9a285d342 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -73,9 +73,30 @@ public abstract class MultiStageOperator
 
   public abstract void registerExecution(long time, int numRows);
 
-  // Samples resource usage of the operator. The operator should call this 
function for every block of data or
-  // assuming the block holds 10000 rows or more.
+  /// This method should be called periodically by the operator to check 
whether the execution should be interrupted.
+  ///
+  /// This could happen when the request deadline is reached, or the thread 
accountant decides to interrupt the query
+  /// due to resource constraints.
+  ///
+  /// Normally, callers should call [#sampleAndCheckInterruption(long 
deadlineMs)] passing the correct deadline, but
+  /// given most operators use either the active or the passive deadline, this 
method is provided as a convenience
+  /// method. By default, it uses the active deadline, which is the one that 
should be used for most operators, but
+  /// if the operator does not actively process data (ie both mailbox 
operators), it should override this method to
+  /// use the passive deadline instead.
+  /// See for example 
[MailboxSendOperator][org.apache.pinot.query.runtime.operator.MailboxSendOperator]).
   protected void sampleAndCheckInterruption() {
+    sampleAndCheckInterruption(_context.getActiveDeadlineMs());
+  }
+
+  /// This method should be called periodically by the operator to check 
whether the execution should be interrupted.
+  ///
+  /// This could happen when the request deadline is reached, or the thread 
accountant decides to interrupt the query
+  /// due to resource constraints.
+  protected void sampleAndCheckInterruption(long deadlineMs) {
+    if (System.currentTimeMillis() >= deadlineMs) {
+      earlyTerminate();
+      throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on " + 
getExplainName());
+    }
     Tracing.ThreadAccountantOps.sampleMSE();
     if (Tracing.ThreadAccountantOps.isInterrupted()) {
       earlyTerminate();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
index 3f6fc1192d..68e5784bc1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
@@ -344,7 +344,7 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
 
     public OfMseBlock(OpChainExecutionContext context,
         List<? extends AsyncStream<ReceivingMailbox.MseBlockWithStats>> 
asyncProducers) {
-      super(context.getId(), context.getDeadlineMs(), asyncProducers);
+      super(context.getId(), context.getPassiveDeadlineMs(), asyncProducers);
       _stageId = context.getStageId();
       _stats = MultiStageQueryStats.emptyStats(context.getStageId());
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index af627f4e28..0c95edbfdd 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -29,6 +29,7 @@ import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
@@ -41,7 +42,8 @@ public class OpChainExecutionContext {
 
   private final MailboxService _mailboxService;
   private final long _requestId;
-  private final long _deadlineMs;
+  private final long _activeDeadlineMs;
+  private final long _passiveDeadlineMs;
   private final Map<String, String> _opChainMetadata;
   private final StageMetadata _stageMetadata;
   private final WorkerMetadata _workerMetadata;
@@ -56,14 +58,24 @@ public class OpChainExecutionContext {
   private ServerPlanRequestContext _leafStageContext;
   private final boolean _sendStats;
 
-
+  @Deprecated
   public OpChainExecutionContext(MailboxService mailboxService, long 
requestId, long deadlineMs,
       Map<String, String> opChainMetadata, StageMetadata stageMetadata, 
WorkerMetadata workerMetadata,
       @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable 
ThreadExecutionContext parentContext,
       boolean sendStats) {
+    this(mailboxService, requestId, deadlineMs, deadlineMs, opChainMetadata, 
stageMetadata, workerMetadata,
+        pipelineBreakerResult, parentContext, sendStats);
+  }
+
+  public OpChainExecutionContext(MailboxService mailboxService, long requestId,
+      long activeDeadlineMs, long passiveDeadlineMs,
+      Map<String, String> opChainMetadata, StageMetadata stageMetadata, 
WorkerMetadata workerMetadata,
+      @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable 
ThreadExecutionContext parentContext,
+      boolean sendStats) {
     _mailboxService = mailboxService;
     _requestId = requestId;
-    _deadlineMs = deadlineMs;
+    _activeDeadlineMs = activeDeadlineMs;
+    _passiveDeadlineMs = passiveDeadlineMs;
     _opChainMetadata = Collections.unmodifiableMap(opChainMetadata);
     _stageMetadata = stageMetadata;
     _workerMetadata = workerMetadata;
@@ -76,6 +88,17 @@ public class OpChainExecutionContext {
     _parentContext = parentContext;
   }
 
+  public static OpChainExecutionContext fromQueryContext(MailboxService 
mailboxService,
+      Map<String, String> opChainMetadata, StageMetadata stageMetadata, 
WorkerMetadata workerMetadata,
+      @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable 
ThreadExecutionContext parentContext,
+      boolean sendStats) {
+    long requestId = QueryThreadContext.getRequestId();
+    long activeDeadlineMs = QueryThreadContext.getActiveDeadlineMs();
+    long passiveDeadlineMs = QueryThreadContext.getPassiveDeadlineMs();
+    return new OpChainExecutionContext(mailboxService, requestId, 
activeDeadlineMs, passiveDeadlineMs,
+        opChainMetadata, stageMetadata, workerMetadata, pipelineBreakerResult, 
parentContext, sendStats);
+  }
+
   public MailboxService getMailboxService() {
     return _mailboxService;
   }
@@ -96,8 +119,37 @@ public class OpChainExecutionContext {
     return _server;
   }
 
+  /// Returns the deadline in milliseconds for the OpChain to complete when it 
is actively waiting for data.
+  ///
+  /// This deadline should only be used for _active_ waits, like when a
+  /// 
[HashJoinOperator][org.apache.pinot.query.runtime.operator.HashJoinOperator] is 
building the hash table.
+  ///
+  /// This should not be used for _passive_ waits, like when the a
+  /// 
[MailboxReceiveOperator][org.apache.pinot.query.runtime.operator.MailboxReceiveOperator]
 or a
+  /// 
[PipelineBreakerOperator][org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator]
 passively waits
+  /// for data to arrive.
+  public long getActiveDeadlineMs() {
+    return _activeDeadlineMs;
+  }
+
+  /// For backward compatibility, we return the active deadline as the default.
+  /// This should be used for active waits only.
+  /// @deprecated Use {@link #getActiveDeadlineMs()} instead.
   public long getDeadlineMs() {
-    return _deadlineMs;
+    return getActiveDeadlineMs();
+  }
+
+  /// Returns the deadline in milliseconds for the OpChain to complete when it 
is passively waiting for data.
+  ///
+  /// This deadline should only be used for _passive_ waits, like when the
+  /// 
[MailboxReceiveOperator][org.apache.pinot.query.runtime.operator.MailboxReceiveOperator]
 or a
+  /// 
[PipelineBreakerOperator][org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator]
+  /// passively waits for data to arrive.
+  ///
+  /// This should not be used for _active_ waits, like when the a
+  /// 
[HashJoinOperator][org.apache.pinot.query.runtime.operator.HashJoinOperator] is 
building the hash table.
+  public long getPassiveDeadlineMs() {
+    return _passiveDeadlineMs;
   }
 
   public Map<String, String> getOpChainMetadata() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index dffd7f6ba2..27f375e4db 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -38,6 +38,7 @@ import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +52,30 @@ public class PipelineBreakerExecutor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipelineBreakerExecutor.class);
 
+  /**
+   * Execute a pipeline breaker and collect the results (synchronously). 
Currently, pipeline breaker executor can only
+   *    execute mailbox receive pipeline breaker.
+   *
+   * @param scheduler scheduler service to run the pipeline breaker main 
thread.
+   * @param mailboxService mailbox service to attach the {@link 
MailboxReceiveNode} against.
+   * @param workerMetadata worker metadata for the current worker.
+   * @param stagePlan the distributed stage plan to run pipeline breaker on.
+   * @param opChainMetadata request metadata, including query options
+   * @param parentContext Parent thread metadata
+   * @return pipeline breaker result;
+   *   - If exception occurs, exception block will be wrapped in {@link 
MseBlock} and assigned to each PB node.
+   *   - Normal stats will be attached to each PB node and downstream 
execution should return with stats attached.
+   */
+  @Nullable
+  public static PipelineBreakerResult 
executePipelineBreakersFromQueryContext(OpChainSchedulerService scheduler,
+      MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan 
stagePlan,
+      Map<String, String> opChainMetadata,
+      @Nullable ThreadExecutionContext parentContext, boolean sendStats) {
+    return executePipelineBreakers(scheduler, mailboxService, workerMetadata, 
stagePlan, opChainMetadata,
+        QueryThreadContext.getRequestId(), 
QueryThreadContext.getActiveDeadlineMs(),
+        QueryThreadContext.getPassiveDeadlineMs(), parentContext, sendStats);
+  }
+
   /**
    * Execute a pipeline breaker and collect the results (synchronously). 
Currently, pipeline breaker executor can only
    *    execute mailbox receive pipeline breaker.
@@ -68,7 +93,7 @@ public class PipelineBreakerExecutor {
   @Nullable
   public static PipelineBreakerResult 
executePipelineBreakers(OpChainSchedulerService scheduler,
       MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan 
stagePlan,
-      Map<String, String> opChainMetadata, long requestId, long deadlineMs,
+      Map<String, String> opChainMetadata, long requestId, long 
activeDeadlineMs, long passiveDeadlineMs,
       @Nullable ThreadExecutionContext parentContext, boolean sendStats) {
     PipelineBreakerContext pipelineBreakerContext = new 
PipelineBreakerContext();
     PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(), 
pipelineBreakerContext);
@@ -78,7 +103,7 @@ public class PipelineBreakerExecutor {
         //     OpChain receive-mail callbacks.
         // see also: MailboxIdUtils TODOs, de-couple mailbox id from query 
information
         OpChainExecutionContext opChainExecutionContext =
-            new OpChainExecutionContext(mailboxService, requestId, deadlineMs, 
opChainMetadata,
+            new OpChainExecutionContext(mailboxService, requestId, 
activeDeadlineMs, passiveDeadlineMs, opChainMetadata,
                 stagePlan.getStageMetadata(), workerMetadata, null, 
parentContext, sendStats);
         return execute(scheduler, pipelineBreakerContext, 
opChainExecutionContext);
       } catch (Exception e) {
@@ -125,7 +150,7 @@ public class PipelineBreakerExecutor {
     OpChain pipelineBreakerOpChain =
         new OpChain(opChainExecutionContext, pipelineBreakerOperator, (id) -> 
latch.countDown());
     scheduler.register(pipelineBreakerOpChain);
-    long timeoutMs = opChainExecutionContext.getDeadlineMs() - 
System.currentTimeMillis();
+    long timeoutMs = opChainExecutionContext.getPassiveDeadlineMs() - 
System.currentTimeMillis();
     if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
       return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), 
pipelineBreakerOperator.getResultMap(),
           pipelineBreakerOperator.getErrorBlock(), 
pipelineBreakerOperator.calculateStats());
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 6e5d1aac01..572c4c55d7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -294,7 +294,9 @@ public class ServerPlanRequestUtils {
       pinotQuery.setQueryOptions(queryOptions);
     }
     queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
-        Long.toString(executionContext.getDeadlineMs() - 
System.currentTimeMillis()));
+        Long.toString(executionContext.getActiveDeadlineMs() - 
System.currentTimeMillis()));
+    
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
+        Long.toString(executionContext.getPassiveDeadlineMs() - 
executionContext.getActiveDeadlineMs()));
   }
 
   /**
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 68cac7e9ce..6486db9228 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -168,7 +168,7 @@ public class QueryDispatcher {
     boolean cancelled = false;
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
-      QueryResult result = runReducer(requestId, dispatchableSubPlan, 
timeoutMs, queryOptions, _mailboxService);
+      QueryResult result = runReducer(dispatchableSubPlan, queryOptions, 
_mailboxService);
       if (result.getProcessingException() != null) {
         MultiStageQueryStats statsFromCancel = cancelWithStats(requestId, 
servers);
         cancelled = true;
@@ -452,6 +452,8 @@ public class QueryDispatcher {
     
requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID, 
cid);
     
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
         Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
+    
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
+        Long.toString(QueryThreadContext.getPassiveDeadlineMs()));
     requestMetadata.putAll(queryOptions);
     return requestMetadata;
   }
@@ -577,15 +579,38 @@ public class QueryDispatcher {
     return _timeSeriesDispatchClientMap.computeIfAbsent(key, k -> new 
TimeSeriesDispatchClient(hostname, port));
   }
 
-  // There is no reduction happening here, results are simply concatenated.
+  /// Concatenates the results of the sub-plan and returns a [QueryResult] 
with the concatenated result.
+  ///
+  /// This method assumes the caller thread is a query thread and therefore 
[QueryThreadContext] has been initialized.
+  private static QueryResult runReducer(
+      DispatchableSubPlan subPlan,
+      Map<String, String> queryOptions,
+      MailboxService mailboxService
+  ) {
+    return runReducer(
+        QueryThreadContext.getRequestId(),
+        subPlan,
+        QueryThreadContext.getActiveDeadlineMs(),
+        QueryThreadContext.getPassiveDeadlineMs(),
+        queryOptions,
+        mailboxService
+    );
+  }
+
+  /// Concatenates the results of the sub-plan and returns a [QueryResult] 
with the concatenated result.
+  ///
+  /// This method should be called from a query thread and therefore using
+  /// [#runReducer(DispatchableSubPlan, Map, MailboxService)] is preferred.
+  ///
+  /// Remember that in MSE there is no actual reduce but rather a single stage 
that concatenates the results.
   @VisibleForTesting
   public static QueryResult runReducer(long requestId,
       DispatchableSubPlan subPlan,
-      long timeoutMs,
+      long activeDeadlineMs,
+      long passiveDeadlineMs,
       Map<String, String> queryOptions,
       MailboxService mailboxService) {
     long startTimeMs = System.currentTimeMillis();
-    long deadlineMs = startTimeMs + timeoutMs;
     // NOTE: Reduce stage is always stage 0
     DispatchablePlanFragment stagePlan = subPlan.getQueryStageMap().get(0);
     PlanFragment planFragment = stagePlan.getPlanFragment();
@@ -597,8 +622,8 @@ public class QueryDispatcher {
     StageMetadata stageMetadata = new StageMetadata(0, workerMetadata, 
stagePlan.getCustomProperties());
     ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
     OpChainExecutionContext executionContext =
-        new OpChainExecutionContext(mailboxService, requestId, deadlineMs, 
queryOptions, stageMetadata,
-            workerMetadata.get(0), null, parentContext, true);
+        new OpChainExecutionContext(mailboxService, requestId, 
activeDeadlineMs, passiveDeadlineMs,
+            queryOptions, stageMetadata, workerMetadata.get(0), null, 
parentContext, true);
 
     PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
     DataSchema sourceSchema = rootNode.getDataSchema();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index fa4689d26a..4f25431751 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -108,7 +108,7 @@ public class PipelineBreakerExecutorTest {
       MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan 
stagePlan,
       Map<String, String> opChainMetadata, long requestId, long deadlineMs) {
     return PipelineBreakerExecutor.executePipelineBreakers(scheduler, 
mailboxService, workerMetadata, stagePlan,
-        opChainMetadata, requestId, deadlineMs, null, true);
+        opChainMetadata, requestId, deadlineMs, deadlineMs, null, true);
   }
 
   @AfterClass
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 89371ec0a3..ff34cc3ef8 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -160,7 +160,12 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
       }
     }
     // exception will be propagated through for assert purpose on runtime error
-    return QueryDispatcher.runReducer(requestId, dispatchableSubPlan, 
timeoutMs, Collections.emptyMap(),
+    long now = System.currentTimeMillis();
+    long extraPassiveTimeout = 1000;
+    return QueryDispatcher.runReducer(requestId, dispatchableSubPlan,
+        timeoutMs + now,
+        timeoutMs + now + extraPassiveTimeout,
+        Collections.emptyMap(),
         _mailboxService);
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
index a4d5174efe..dbfc6ece96 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
@@ -181,9 +181,12 @@ public class QueryThreadContext {
     }
     QueryThreadContext.setIds(requestId, cid);
     long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+    long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault(
+        
CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0"));
     long startTimeMs = System.currentTimeMillis();
     QueryThreadContext.setStartTimeMs(startTimeMs);
-    QueryThreadContext.setDeadlineMs(startTimeMs + timeoutMs);
+    QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs);
+    QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs + 
extraPassiveTimeoutMs);
 
     return open;
   }
@@ -217,7 +220,8 @@ public class QueryThreadContext {
     Instance context = new Instance();
     if (memento != null) {
       context.setStartTimeMs(memento._startTimeMs);
-      context.setDeadlineMs(memento._deadlineMs);
+      context.setActiveDeadlineMs(memento._activeDeadlineMs);
+      context.setPassiveDeadlineMs(memento._passiveDeadlineMs);
       context.setBrokerId(memento._brokerId);
       context.setRequestId(memento._requestId);
       context.setCid(memento._cid);
@@ -295,23 +299,60 @@ public class QueryThreadContext {
   }
 
   /**
-   * Returns the deadline time of the query in milliseconds since epoch.
+   * Use {@link #getActiveDeadlineMs()} instead.
+   */
+  @Deprecated
+  public static long getDeadlineMs() {
+    return get().getActiveDeadlineMs();
+  }
+
+  /**
+   * @deprecated Use {@link #setActiveDeadlineMs(long)} instead.
+   * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
+   */
+  @Deprecated
+  public static void setDeadlineMs(long deadlineMs) {
+    get().setActiveDeadlineMs(deadlineMs);
+  }
+
+  /**
+   * Returns the active deadline time of the query in milliseconds since epoch.
    *
    * The default value of 0 means the deadline is not set.
    * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
    */
-  public static long getDeadlineMs() {
-    return get().getDeadlineMs();
+  public static long getActiveDeadlineMs() {
+    return get().getActiveDeadlineMs();
   }
 
   /**
-   * Sets the deadline time of the query in milliseconds since epoch.
+   * Sets the active deadline time of the query in milliseconds since epoch.
    *
    * The deadline can only be set once.
    * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
    */
-  public static void setDeadlineMs(long deadlineMs) {
-    get().setDeadlineMs(deadlineMs);
+  public static void setActiveDeadlineMs(long activeDeadlineMs) {
+    get().setActiveDeadlineMs(activeDeadlineMs);
+  }
+
+  /**
+   * Returns the passive deadline time of the query in milliseconds since 
epoch.
+   *
+   * The default value of 0 means the deadline is not set.
+   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
+   */
+  public static long getPassiveDeadlineMs() {
+    return get().getPassiveDeadlineMs();
+  }
+
+  /**
+   * Sets the passive deadline time of the query in milliseconds since epoch.
+   *
+   * The deadline can only be set once.
+   * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
+   */
+  public static void setPassiveDeadlineMs(long passiveDeadlineMs) {
+    get().setPassiveDeadlineMs(passiveDeadlineMs);
   }
 
   /**
@@ -456,7 +497,8 @@ public class QueryThreadContext {
    */
   private static class Instance implements CloseableContext {
     private long _startTimeMs;
-    private long _deadlineMs;
+    private long _activeDeadlineMs;
+    private long _passiveDeadlineMs;
     private String _brokerId;
     private long _requestId;
     private String _cid;
@@ -474,14 +516,34 @@ public class QueryThreadContext {
       _startTimeMs = startTimeMs;
     }
 
+    @Deprecated
     public long getDeadlineMs() {
-      return _deadlineMs;
+      return getActiveDeadlineMs();
+    }
+
+    public long getActiveDeadlineMs() {
+      return _activeDeadlineMs;
     }
 
+    @Deprecated
     public void setDeadlineMs(long deadlineMs) {
-      Preconditions.checkState(getDeadlineMs() == 0, "Deadline already set to 
%s, cannot set again",
-          getDeadlineMs());
-      _deadlineMs = deadlineMs;
+      setActiveDeadlineMs(deadlineMs);
+    }
+
+    public void setActiveDeadlineMs(long activeDeadlineMs) {
+      Preconditions.checkState(getActiveDeadlineMs() == 0, "Deadline already 
set to %s, cannot set again",
+          getActiveDeadlineMs());
+      _activeDeadlineMs = activeDeadlineMs;
+    }
+
+    public long getPassiveDeadlineMs() {
+      return _passiveDeadlineMs;
+    }
+
+    public void setPassiveDeadlineMs(long passiveDeadlineMs) {
+      Preconditions.checkState(getPassiveDeadlineMs() == 0, "Passive deadline 
already set to %s, cannot set again",
+          getPassiveDeadlineMs());
+      _passiveDeadlineMs = passiveDeadlineMs;
     }
 
     public String getBrokerId() {
@@ -576,8 +638,13 @@ public class QueryThreadContext {
     }
 
     @Override
-    public void setDeadlineMs(long deadlineMs) {
-      LOGGER.debug("Setting deadline to {} in a fake context", deadlineMs);
+    public void setActiveDeadlineMs(long activeDeadlineMs) {
+      LOGGER.debug("Setting active deadline to {} in a fake context", 
activeDeadlineMs);
+    }
+
+    @Override
+    public void setPassiveDeadlineMs(long passiveDeadlineMs) {
+      LOGGER.debug("Setting passive deadline to {} in a fake context", 
passiveDeadlineMs);
     }
 
     @Override
@@ -632,7 +699,8 @@ public class QueryThreadContext {
    */
   public static class Memento {
     private final long _startTimeMs;
-    private final long _deadlineMs;
+    private final long _activeDeadlineMs;
+    private final long _passiveDeadlineMs;
     private final String _brokerId;
     private final long _requestId;
     private final String _cid;
@@ -642,7 +710,8 @@ public class QueryThreadContext {
 
     private Memento(Instance instance) {
       _startTimeMs = instance.getStartTimeMs();
-      _deadlineMs = instance.getDeadlineMs();
+      _activeDeadlineMs = instance.getActiveDeadlineMs();
+      _passiveDeadlineMs = instance.getPassiveDeadlineMs();
       _brokerId = instance.getBrokerId();
       _requestId = instance.getRequestId();
       _cid = instance.getCid();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2d60123329..3a2b703edc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -344,6 +344,8 @@ public class CommonConstants {
     public static final String CONFIG_OF_BROKER_ENABLE_ROW_COLUMN_LEVEL_AUTH =
         "pinot.broker.enable.row.column.level.auth";
     public static final boolean DEFAULT_BROKER_ENABLE_ROW_COLUMN_LEVEL_AUTH = 
false;
+    public static final String CONFIG_OF_EXTRA_PASSIVE_TIMEOUT_MS = 
"pinot.broker.extraPassiveTimeoutMs";
+    public static final long DEFAULT_EXTRA_PASSIVE_TIMEOUT_MS = 100L;
     public static final String CONFIG_OF_BROKER_ID = 
"pinot.broker.instance.id";
     public static final String CONFIG_OF_BROKER_INSTANCE_TAGS = 
"pinot.broker.instance.tags";
     public static final String CONFIG_OF_BROKER_HOSTNAME = 
"pinot.broker.hostname";
@@ -544,6 +546,7 @@ public class CommonConstants {
 
       public static class QueryOptionKey {
         public static final String TIMEOUT_MS = "timeoutMs";
+        public static final String EXTRA_PASSIVE_TIMEOUT_MS = 
"extraPassiveTimeoutMs";
         public static final String SKIP_UPSERT = "skipUpsert";
         public static final String SKIP_UPSERT_VIEW = "skipUpsertView";
         public static final String UPSERT_VIEW_FRESHNESS_MS = 
"upsertViewFreshnessMs";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to