This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dc7b2015f5 eh (#18122)
dc7b2015f5 is described below

commit dc7b2015f535f08414ae82c8b2d1c8701371f6aa
Author: AKIRA <33112463+kikyou1...@users.noreply.github.com>
AuthorDate: Mon Mar 27 12:09:35 2023 +0900

    eh (#18122)
---
 .../apache/doris/statistics/StatisticsCache.java   | 32 ++++++++++-
 .../doris/statistics/StatisticsCacheLoader.java    | 62 +++++++++++++++++-----
 2 files changed, 81 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index c6486459a6..9add3c1bc3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.statistics;
 
+//import org.apache.doris.common.ThreadPoolManager;
+
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.qe.ConnectContext;
 
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
@@ -26,16 +29,43 @@ import org.apache.logging.log4j.Logger;
 
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class StatisticsCache {
 
     private static final Logger LOG = 
LogManager.getLogger(StatisticsCache.class);
 
+    /**
+     * Use a standalone thread pool to avoid interference between this and any 
other jdk function
+     * that use the thread of ForkJoinPool#common in the system.
+     */
+    private final ThreadPoolExecutor threadPool
+            = ThreadPoolManager.newDaemonFixedThreadPool(
+            10, Integer.MAX_VALUE, "STATS_FETCH", true);
+
+    private final StatisticsCacheLoader cacheLoader = new 
StatisticsCacheLoader();
+
     private final AsyncLoadingCache<StatisticsCacheKey, 
ColumnLevelStatisticCache> cache = Caffeine.newBuilder()
             .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
             
.expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
             
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
-            .buildAsync(new StatisticsCacheLoader());
+            .executor(threadPool)
+            .buildAsync(cacheLoader);
+
+    {
+        threadPool.submit(() -> {
+            while (true) {
+                try {
+                    cacheLoader.removeExpiredInProgressing();
+                    Thread.sleep(TimeUnit.MINUTES.toMillis(15));
+                } catch (Throwable t) {
+                    // IGNORE
+                }
+            }
+
+        });
+    }
 
     public ColumnStatistic getColumnStatistics(long tblId, String colName) {
         ColumnLevelStatisticCache columnLevelStatisticCache = 
getColumnStatistics(tblId, -1, colName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
index c592a9b4eb..3417356b78 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
@@ -33,9 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 public class StatisticsCacheLoader implements 
AsyncCacheLoader<StatisticsCacheKey, ColumnLevelStatisticCache> {
 
@@ -51,18 +50,17 @@ public class StatisticsCacheLoader implements 
AsyncCacheLoader<StatisticsCacheKe
 
     // TODO: Maybe we should trigger a analyze job when the required 
ColumnStatistic doesn't exists.
 
-    private final ConcurrentMap<StatisticsCacheKey, 
CompletableFuture<ColumnLevelStatisticCache>>
-            inProgressing = new ConcurrentHashMap<>();
+    private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime>
+            inProgressing = new HashMap<>();
 
     @Override
     public @NonNull CompletableFuture<ColumnLevelStatisticCache> 
asyncLoad(@NonNull StatisticsCacheKey key,
             @NonNull Executor executor) {
-
-        CompletableFuture<ColumnLevelStatisticCache> future = 
inProgressing.get(key);
-        if (future != null) {
-            return future;
+        CompletableFutureWithCreateTime cfWrapper = inProgressing.get(key);
+        if (cfWrapper != null) {
+            return cfWrapper.cf;
         }
-        future = CompletableFuture.supplyAsync(() -> {
+        CompletableFuture<ColumnLevelStatisticCache> future = 
CompletableFuture.supplyAsync(() -> {
             long startTime = System.currentTimeMillis();
             try {
                 LOG.info("Query BE for column stats:{}-{} start time:{}", 
key.tableId, key.colName,
@@ -107,10 +105,50 @@ public class StatisticsCacheLoader implements 
AsyncCacheLoader<StatisticsCacheKe
                 long endTime = System.currentTimeMillis();
                 LOG.info("Query BE for column stats:{}-{} end time:{} cost 
time:{}", key.tableId, key.colName,
                         endTime, endTime - startTime);
-                inProgressing.remove(key);
+                removeFromIProgressing(key);
             }
-        });
-        inProgressing.put(key, future);
+        }, executor);
+        putIntoIProgressing(key, new 
CompletableFutureWithCreateTime(System.currentTimeMillis(), future));
         return future;
     }
+
+    private void putIntoIProgressing(StatisticsCacheKey k, 
CompletableFutureWithCreateTime v) {
+        synchronized (inProgressing) {
+            inProgressing.put(k, v);
+        }
+    }
+
+    private void removeFromIProgressing(StatisticsCacheKey k) {
+        synchronized (inProgressing) {
+            inProgressing.remove(k);
+        }
+    }
+
+    public void removeExpiredInProgressing() {
+        // Quite simple logic that would complete very fast.
+        // Lock on object to avoid ConcurrentModificationException.
+        synchronized (inProgressing) {
+            inProgressing.entrySet().removeIf(e -> e.getValue().isExpired());
+        }
+    }
+
+    /**
+     * To make sure any item in the inProgressing would finally be removed to 
avoid potential mem leak.
+     */
+    private static class CompletableFutureWithCreateTime extends 
CompletableFuture<ColumnLevelStatisticCache> {
+
+        private static final long EXPIRED_TIME_MILLI = 
TimeUnit.MINUTES.toMillis(30);
+
+        public final long startTime;
+        public final CompletableFuture<ColumnLevelStatisticCache> cf;
+
+        public CompletableFutureWithCreateTime(long startTime, 
CompletableFuture<ColumnLevelStatisticCache> cf) {
+            this.startTime = startTime;
+            this.cf = cf;
+        }
+
+        public boolean isExpired() {
+            return System.currentTimeMillis() - startTime > EXPIRED_TIME_MILLI;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to