yashmayya commented on code in PR #15609:
URL: https://github.com/apache/pinot/pull/15609#discussion_r2065941687


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1570,6 +1569,18 @@ public static class PlanVersions {
     public static final String KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN =
         "pinot.query.multistage.explain.include.segment.plan";
     public static final boolean 
DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN = false;
+
+    /// Max number of rows operators stored in the op stats cache.
+    /// Although the cache stores stages, each entry has a weight equal to the 
number of operators in the stage.
+    public static final String KEY_OF_OP_STATS_CACHE_MAX = 
"pinot.server.query.op.stats.cache.max";
+    public static final int DEFAULT_OF_OP_STATS_CACHE_MAX = 1000;
+
+    /// Max time to keep the op stats in the cache.
+    public static final String KEY_OF_OP_STATS_CACHE_EXPIRE_MS = 
"pinot.server.query.op.stats.cache.secs";
+    public static final int DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS = 10 * 60 * 
1000;
+    /// Timeout of the cancel request, in milliseconds.
+    public static final String KEY_OF_CANCEL_TIMEOUT_MS = 
"pinot.server.query.cancel.timeout";

Review Comment:
   Is this intended to be applied to SSE query cancellation as well (in the 
future) or can we indicate that this is MSE specific in the config?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1570,6 +1569,18 @@ public static class PlanVersions {
     public static final String KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN =
         "pinot.query.multistage.explain.include.segment.plan";
     public static final boolean 
DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN = false;
+
+    /// Max number of rows operators stored in the op stats cache.
+    /// Although the cache stores stages, each entry has a weight equal to the 
number of operators in the stage.
+    public static final String KEY_OF_OP_STATS_CACHE_MAX = 
"pinot.server.query.op.stats.cache.max";
+    public static final int DEFAULT_OF_OP_STATS_CACHE_MAX = 1000;
+
+    /// Max time to keep the op stats in the cache.
+    public static final String KEY_OF_OP_STATS_CACHE_EXPIRE_MS = 
"pinot.server.query.op.stats.cache.secs";
+    public static final int DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS = 10 * 60 * 
1000;
+    /// Timeout of the cancel request, in milliseconds.
+    public static final String KEY_OF_CANCEL_TIMEOUT_MS = 
"pinot.server.query.cancel.timeout";

Review Comment:
   ```suggestion
       public static final String KEY_OF_CANCEL_TIMEOUT_MS = 
"pinot.server.query.cancel.timeout.ms";
   ```
   nit: other milliseconds based configs have this suffix



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -18,35 +18,65 @@
  */
 package org.apache.pinot.query.runtime.executor;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class OpChainSchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
 
   private final ExecutorService _executorService;
-  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap = 
new ConcurrentHashMap<>();
+  private final Cache<OpChainId, MultiStageOperator> _opChainCache;
+
+
+  public OpChainSchedulerService(ExecutorService executorService, 
PinotConfiguration config) {
+    this(
+        executorService,
+        config.getProperty(MultiStageQueryRunner.KEY_OF_OP_STATS_CACHE_MAX,
+            MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_MAX),
+        
config.getProperty(MultiStageQueryRunner.KEY_OF_OP_STATS_CACHE_EXPIRE_MS,
+            MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS)
+    );
+  }
 
   public OpChainSchedulerService(ExecutorService executorService) {
+    this(executorService, MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_MAX,
+        MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS);
+  }
+
+  public OpChainSchedulerService(ExecutorService executorService, int 
maxWeight, long expireAfterWriteMs) {
     _executorService = executorService;
-    _submittedOpChainMap = new ConcurrentHashMap<>();
+    _opChainCache = CacheBuilder.newBuilder()
+        .weigher((OpChainId key, MultiStageOperator value) -> 
countOperators(value))

Review Comment:
   Why do we care about the weight / number of operators in the tree here? Is 
the concern about the total number of operator instances being kept alive in 
memory?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -18,35 +18,65 @@
  */
 package org.apache.pinot.query.runtime.executor;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class OpChainSchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
 
   private final ExecutorService _executorService;
-  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap = 
new ConcurrentHashMap<>();
+  private final Cache<OpChainId, MultiStageOperator> _opChainCache;
+
+
+  public OpChainSchedulerService(ExecutorService executorService, 
PinotConfiguration config) {

Review Comment:
   I think this constructor should be called in `QueryRunner`?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -146,18 +162,40 @@ public QueryResult submitAndReduce(RequestContext 
context, DispatchableSubPlan d
     Set<QueryServerInstance> servers = new HashSet<>();
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
-      try {
-        return runReducer(requestId, dispatchableSubPlan, timeoutMs, 
queryOptions, _mailboxService);
-      } finally {
-        if (isQueryCancellationEnabled()) {
-          _serversByQuery.remove(requestId);
-        }
-      }
+      return runReducer(requestId, dispatchableSubPlan, timeoutMs, 
queryOptions, _mailboxService);
+    } catch (Exception ex) {
+      return tryRecover(context.getRequestId(), servers, ex);
     } catch (Throwable e) {
       // TODO: Consider always cancel when it returns (early terminate)
-      cancel(requestId, servers);
+      cancel(requestId);
       throw e;
+    } finally {
+      if (isQueryCancellationEnabled()) {
+        _serversByQuery.remove(requestId);
+      }
+    }
+  }
+
+  private QueryResult tryRecover(long requestId, Set<QueryServerInstance> 
servers, Exception ex)
+      throws Exception {
+    if (servers.isEmpty()) {
+      throw ex;
+    }
+    QueryErrorCode errorCode;
+    if (ex instanceof TimeoutException) {
+      errorCode = QueryErrorCode.EXECUTION_TIMEOUT;
+    } else if (ex instanceof QueryException) {
+      errorCode = ((QueryException) ex).getErrorCode();

Review Comment:
   We don't need to try this recovery of stats for non timeout exceptions 
right? Shouldn't the error blocks we get from the broker's root stage mailbox 
in those cases already have the stats?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -146,18 +162,40 @@ public QueryResult submitAndReduce(RequestContext 
context, DispatchableSubPlan d
     Set<QueryServerInstance> servers = new HashSet<>();
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
-      try {
-        return runReducer(requestId, dispatchableSubPlan, timeoutMs, 
queryOptions, _mailboxService);
-      } finally {
-        if (isQueryCancellationEnabled()) {
-          _serversByQuery.remove(requestId);
-        }
-      }
+      return runReducer(requestId, dispatchableSubPlan, timeoutMs, 
queryOptions, _mailboxService);
+    } catch (Exception ex) {
+      return tryRecover(context.getRequestId(), servers, ex);
     } catch (Throwable e) {
       // TODO: Consider always cancel when it returns (early terminate)
-      cancel(requestId, servers);
+      cancel(requestId);
       throw e;
+    } finally {
+      if (isQueryCancellationEnabled()) {
+        _serversByQuery.remove(requestId);
+      }
+    }
+  }
+
+  private QueryResult tryRecover(long requestId, Set<QueryServerInstance> 
servers, Exception ex)
+      throws Exception {
+    if (servers.isEmpty()) {
+      throw ex;
+    }
+    QueryErrorCode errorCode;
+    if (ex instanceof TimeoutException) {
+      errorCode = QueryErrorCode.EXECUTION_TIMEOUT;
+    } else if (ex instanceof QueryException) {
+      errorCode = ((QueryException) ex).getErrorCode();
+    } else {
+      cancel(requestId, servers);
+      throw ex;
+    }
+    MultiStageQueryStats stats = cancelWithStats(requestId, servers);

Review Comment:
   Might be useful to add a comment here briefly explaining why this special 
handling is required only for timeout errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to