gortiz commented on code in PR #15609:
URL: https://github.com/apache/pinot/pull/15609#discussion_r2068522578


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -18,35 +18,65 @@
  */
 package org.apache.pinot.query.runtime.executor;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class OpChainSchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
 
   private final ExecutorService _executorService;
-  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap = 
new ConcurrentHashMap<>();
+  private final Cache<OpChainId, MultiStageOperator> _opChainCache;
+
+
+  public OpChainSchedulerService(ExecutorService executorService, 
PinotConfiguration config) {
+    this(
+        executorService,
+        config.getProperty(MultiStageQueryRunner.KEY_OF_OP_STATS_CACHE_MAX,
+            MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_MAX),
+        
config.getProperty(MultiStageQueryRunner.KEY_OF_OP_STATS_CACHE_EXPIRE_MS,
+            MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS)
+    );
+  }
 
   public OpChainSchedulerService(ExecutorService executorService) {
+    this(executorService, MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_MAX,
+        MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS);
+  }
+
+  public OpChainSchedulerService(ExecutorService executorService, int 
maxWeight, long expireAfterWriteMs) {
     _executorService = executorService;
-    _submittedOpChainMap = new ConcurrentHashMap<>();
+    _opChainCache = CacheBuilder.newBuilder()
+        .weigher((OpChainId key, MultiStageOperator value) -> 
countOperators(value))

Review Comment:
   The more operators in the stage, the more memory this cache keeps alive. 
Probably it is similar to using weigh = 1, because stages are not huge (AFAIK 
only UNION is not bound) but that may change in the future and it shouldn't be 
very expensive to count the number of opeartors



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to