This is an automated email from the ASF dual-hosted git repository. englefly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dc7b2015f5 eh (#18122) dc7b2015f5 is described below commit dc7b2015f535f08414ae82c8b2d1c8701371f6aa Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Mon Mar 27 12:09:35 2023 +0900 eh (#18122) --- .../apache/doris/statistics/StatisticsCache.java | 32 ++++++++++- .../doris/statistics/StatisticsCacheLoader.java | 62 +++++++++++++++++----- 2 files changed, 81 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index c6486459a6..9add3c1bc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -17,6 +17,9 @@ package org.apache.doris.statistics; +//import org.apache.doris.common.ThreadPoolManager; + +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.qe.ConnectContext; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; @@ -26,16 +29,43 @@ import org.apache.logging.log4j.Logger; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class StatisticsCache { private static final Logger LOG = LogManager.getLogger(StatisticsCache.class); + /** + * Use a standalone thread pool to avoid interference between this and any other jdk function + * that use the thread of ForkJoinPool#common in the system. + */ + private final ThreadPoolExecutor threadPool + = ThreadPoolManager.newDaemonFixedThreadPool( + 10, Integer.MAX_VALUE, "STATS_FETCH", true); + + private final StatisticsCacheLoader cacheLoader = new StatisticsCacheLoader(); + private final AsyncLoadingCache<StatisticsCacheKey, ColumnLevelStatisticCache> cache = Caffeine.newBuilder() .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE) .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS)) .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL)) - .buildAsync(new StatisticsCacheLoader()); + .executor(threadPool) + .buildAsync(cacheLoader); + + { + threadPool.submit(() -> { + while (true) { + try { + cacheLoader.removeExpiredInProgressing(); + Thread.sleep(TimeUnit.MINUTES.toMillis(15)); + } catch (Throwable t) { + // IGNORE + } + } + + }); + } public ColumnStatistic getColumnStatistics(long tblId, String colName) { ColumnLevelStatisticCache columnLevelStatisticCache = getColumnStatistics(tblId, -1, colName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java index c592a9b4eb..3417356b78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java @@ -33,9 +33,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKey, ColumnLevelStatisticCache> { @@ -51,18 +50,17 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe // TODO: Maybe we should trigger a analyze job when the required ColumnStatistic doesn't exists. - private final ConcurrentMap<StatisticsCacheKey, CompletableFuture<ColumnLevelStatisticCache>> - inProgressing = new ConcurrentHashMap<>(); + private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime> + inProgressing = new HashMap<>(); @Override public @NonNull CompletableFuture<ColumnLevelStatisticCache> asyncLoad(@NonNull StatisticsCacheKey key, @NonNull Executor executor) { - - CompletableFuture<ColumnLevelStatisticCache> future = inProgressing.get(key); - if (future != null) { - return future; + CompletableFutureWithCreateTime cfWrapper = inProgressing.get(key); + if (cfWrapper != null) { + return cfWrapper.cf; } - future = CompletableFuture.supplyAsync(() -> { + CompletableFuture<ColumnLevelStatisticCache> future = CompletableFuture.supplyAsync(() -> { long startTime = System.currentTimeMillis(); try { LOG.info("Query BE for column stats:{}-{} start time:{}", key.tableId, key.colName, @@ -107,10 +105,50 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe long endTime = System.currentTimeMillis(); LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName, endTime, endTime - startTime); - inProgressing.remove(key); + removeFromIProgressing(key); } - }); - inProgressing.put(key, future); + }, executor); + putIntoIProgressing(key, new CompletableFutureWithCreateTime(System.currentTimeMillis(), future)); return future; } + + private void putIntoIProgressing(StatisticsCacheKey k, CompletableFutureWithCreateTime v) { + synchronized (inProgressing) { + inProgressing.put(k, v); + } + } + + private void removeFromIProgressing(StatisticsCacheKey k) { + synchronized (inProgressing) { + inProgressing.remove(k); + } + } + + public void removeExpiredInProgressing() { + // Quite simple logic that would complete very fast. + // Lock on object to avoid ConcurrentModificationException. + synchronized (inProgressing) { + inProgressing.entrySet().removeIf(e -> e.getValue().isExpired()); + } + } + + /** + * To make sure any item in the inProgressing would finally be removed to avoid potential mem leak. + */ + private static class CompletableFutureWithCreateTime extends CompletableFuture<ColumnLevelStatisticCache> { + + private static final long EXPIRED_TIME_MILLI = TimeUnit.MINUTES.toMillis(30); + + public final long startTime; + public final CompletableFuture<ColumnLevelStatisticCache> cf; + + public CompletableFutureWithCreateTime(long startTime, CompletableFuture<ColumnLevelStatisticCache> cf) { + this.startTime = startTime; + this.cf = cf; + } + + public boolean isExpired() { + return System.currentTimeMillis() - startTime > EXPIRED_TIME_MILLI; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org