gortiz commented on code in PR #15609: URL: https://github.com/apache/pinot/pull/15609#discussion_r2068522578
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java: ########## @@ -18,35 +18,65 @@ */ package org.apache.pinot.query.runtime.executor; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.pinot.core.util.trace.TraceRunnable; import org.apache.pinot.query.runtime.blocks.ErrorMseBlock; import org.apache.pinot.query.runtime.blocks.MseBlock; +import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.operator.OpChainId; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.trace.Tracing; +import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class OpChainSchedulerService { private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class); private final ExecutorService _executorService; - private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap; + private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap = new ConcurrentHashMap<>(); + private final Cache<OpChainId, MultiStageOperator> _opChainCache; + + + public OpChainSchedulerService(ExecutorService executorService, PinotConfiguration config) { + this( + executorService, + config.getProperty(MultiStageQueryRunner.KEY_OF_OP_STATS_CACHE_MAX, + MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_MAX), + config.getProperty(MultiStageQueryRunner.KEY_OF_OP_STATS_CACHE_EXPIRE_MS, + MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS) + ); + } public OpChainSchedulerService(ExecutorService executorService) { + this(executorService, MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_MAX, + MultiStageQueryRunner.DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS); + } + + public OpChainSchedulerService(ExecutorService executorService, int maxWeight, long expireAfterWriteMs) { _executorService = executorService; - _submittedOpChainMap = new ConcurrentHashMap<>(); + _opChainCache = CacheBuilder.newBuilder() + .weigher((OpChainId key, MultiStageOperator value) -> countOperators(value)) Review Comment: The more operators in the stage, the more memory this cache keeps alive. Probably it is similar to using weigh = 1, because stages are not huge (AFAIK only UNION is not bound) but that may change in the future and it shouldn't be very expensive to count the number of opeartors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org