This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2e5fb94d03b [opt](rowcount) refresh external table's rowcount async 
(#32997) (#32998)
2e5fb94d03b is described below

commit 2e5fb94d03bd1de832608fd83449d8d883dda32a
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Mon Apr 1 11:28:20 2024 +0800

    [opt](rowcount) refresh external table's rowcount async (#32997) (#32998)
    
    bp #32997
---
 .../doris/datasource/ExternalMetaCacheMgr.java     |   3 +-
 .../doris/datasource/ExternalRowCountCache.java    |  13 ++-
 .../datasource/ExternalRowCountCacheTest.java      | 100 +++++++++++++++++++++
 3 files changed, 111 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 1492392ee67..c8cafeb990e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -68,7 +68,8 @@ public class ExternalMetaCacheMgr {
                 "ExternalMetaCacheMgr", 120, true);
         hudiPartitionMgr = HudiPartitionMgr.get(executor);
         fsCache = new FileSystemCache(executor);
-        rowCountCache = new ExternalRowCountCache(executor);
+        rowCountCache = new ExternalRowCountCache(executor,
+                Config.external_cache_expire_time_minutes_after_access * 60, 
null);
         icebergMetadataCacheMgr = new IcebergMetadataCacheMgr();
         maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java
index 1441efa9bf5..632cde1d5a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java
@@ -24,6 +24,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
 
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.Getter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -37,14 +38,19 @@ public class ExternalRowCountCache {
     private static final Logger LOG = 
LogManager.getLogger(ExternalRowCountCache.class);
     private final AsyncLoadingCache<RowCountKey, Optional<Long>> rowCountCache;
 
-    public ExternalRowCountCache(ExecutorService executor) {
+    public ExternalRowCountCache(ExecutorService executor, long 
refreshAfterWriteSeconds,
+            BasicAsyncCacheLoader<RowCountKey, Optional<Long>> loader) {
+        // 1. set expireAfterWrite to 1 day, avoid too many entries
+        // 2. set refreshAfterWrite to 10min(default), so that the cache will 
be refreshed after 10min
         rowCountCache = Caffeine.newBuilder()
                 .maximumSize(Config.max_external_table_row_count_cache_num)
-                
.expireAfterWrite(Duration.ofMinutes(Config.external_cache_expire_time_minutes_after_access))
+                .expireAfterAccess(Duration.ofDays(1))
+                
.refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSeconds))
                 .executor(executor)
-                .buildAsync(new RowCountCacheLoader());
+                .buildAsync(loader == null ? new RowCountCacheLoader() : 
loader);
     }
 
+    @Getter
     public static class RowCountKey {
         private final long catalogId;
         private final long dbId;
@@ -74,7 +80,6 @@ public class ExternalRowCountCache {
     }
 
     public static class RowCountCacheLoader extends 
BasicAsyncCacheLoader<RowCountKey, Optional<Long>> {
-
         @Override
         protected Optional<Long> doLoad(RowCountKey rowCountKey) {
             try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java
new file mode 100644
index 00000000000..e8622f6b59a
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.datasource.ExternalRowCountCache.RowCountKey;
+import org.apache.doris.statistics.BasicAsyncCacheLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ExternalRowCountCacheTest {
+    private ExternalRowCountCache cache;
+    private ExecutorService executorService;
+
+    public static class TestLoader extends BasicAsyncCacheLoader<RowCountKey, 
Optional<Long>> {
+
+        private AtomicLong incr = new AtomicLong(333);
+
+        @Override
+        protected Optional<Long> doLoad(RowCountKey rowCountKey) {
+            if (rowCountKey.getTableId() == 1) {
+                return Optional.of(111L);
+            } else if (rowCountKey.getTableId() == 2) {
+                return Optional.of(222L);
+            } else if (rowCountKey.getTableId() == 3) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                System.out.println("load: " + incr.get());
+                return Optional.of(incr.incrementAndGet());
+            }
+            return Optional.empty();
+        }
+    }
+
+    @BeforeEach
+    public void setUp() {
+        executorService = Executors.newFixedThreadPool(2);
+        cache = new ExternalRowCountCache(executorService, 2, new 
TestLoader());
+    }
+
+    @Test
+    public void test() throws Exception {
+        // table 1
+        long rowCount = cache.getCachedRowCount(1, 1, 1);
+        Assertions.assertEquals(0, rowCount);
+        Thread.sleep(1000);
+        rowCount = cache.getCachedRowCount(1, 1, 1);
+        Assertions.assertEquals(111, rowCount);
+
+        // table 2
+        rowCount = cache.getCachedRowCount(1, 1, 2);
+        Assertions.assertEquals(0, rowCount);
+        Thread.sleep(1000);
+        rowCount = cache.getCachedRowCount(1, 1, 2);
+        Assertions.assertEquals(222, rowCount);
+
+        // table 3
+        rowCount = cache.getCachedRowCount(1, 1, 3);
+        // first get, it should be 0 because the loader is async
+        Assertions.assertEquals(0, rowCount);
+        // After sleep 2 sec and then get, it should be 1
+        Thread.sleep(2000);
+        rowCount = cache.getCachedRowCount(1, 1, 3);
+        Assertions.assertEquals(334, rowCount);
+        // sleep 3 sec to trigger refresh
+        Thread.sleep(3000);
+        rowCount = cache.getCachedRowCount(1, 1, 3);
+        // the refresh will be triggered only when query it, so it should 
still be 1
+        Assertions.assertEquals(334, rowCount);
+        // sleep 2 sec to wait for the doLoad
+        Thread.sleep(2000);
+        rowCount = cache.getCachedRowCount(1, 1, 3);
+        // refresh done, value should be 2
+        Assertions.assertEquals(335, rowCount);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to