This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 2e5fb94d03b [opt](rowcount) refresh external table's rowcount async (#32997) (#32998) 2e5fb94d03b is described below commit 2e5fb94d03bd1de832608fd83449d8d883dda32a Author: Mingyu Chen <morning...@163.com> AuthorDate: Mon Apr 1 11:28:20 2024 +0800 [opt](rowcount) refresh external table's rowcount async (#32997) (#32998) bp #32997 --- .../doris/datasource/ExternalMetaCacheMgr.java | 3 +- .../doris/datasource/ExternalRowCountCache.java | 13 ++- .../datasource/ExternalRowCountCacheTest.java | 100 +++++++++++++++++++++ 3 files changed, 111 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index 1492392ee67..c8cafeb990e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -68,7 +68,8 @@ public class ExternalMetaCacheMgr { "ExternalMetaCacheMgr", 120, true); hudiPartitionMgr = HudiPartitionMgr.get(executor); fsCache = new FileSystemCache(executor); - rowCountCache = new ExternalRowCountCache(executor); + rowCountCache = new ExternalRowCountCache(executor, + Config.external_cache_expire_time_minutes_after_access * 60, null); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index 1441efa9bf5..632cde1d5a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -24,6 +24,7 @@ import org.apache.doris.statistics.util.StatisticsUtil; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; +import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -37,14 +38,19 @@ public class ExternalRowCountCache { private static final Logger LOG = LogManager.getLogger(ExternalRowCountCache.class); private final AsyncLoadingCache<RowCountKey, Optional<Long>> rowCountCache; - public ExternalRowCountCache(ExecutorService executor) { + public ExternalRowCountCache(ExecutorService executor, long refreshAfterWriteSeconds, + BasicAsyncCacheLoader<RowCountKey, Optional<Long>> loader) { + // 1. set expireAfterWrite to 1 day, avoid too many entries + // 2. set refreshAfterWrite to 10min(default), so that the cache will be refreshed after 10min rowCountCache = Caffeine.newBuilder() .maximumSize(Config.max_external_table_row_count_cache_num) - .expireAfterWrite(Duration.ofMinutes(Config.external_cache_expire_time_minutes_after_access)) + .expireAfterAccess(Duration.ofDays(1)) + .refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSeconds)) .executor(executor) - .buildAsync(new RowCountCacheLoader()); + .buildAsync(loader == null ? new RowCountCacheLoader() : loader); } + @Getter public static class RowCountKey { private final long catalogId; private final long dbId; @@ -74,7 +80,6 @@ public class ExternalRowCountCache { } public static class RowCountCacheLoader extends BasicAsyncCacheLoader<RowCountKey, Optional<Long>> { - @Override protected Optional<Long> doLoad(RowCountKey rowCountKey) { try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java new file mode 100644 index 00000000000..e8622f6b59a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.datasource.ExternalRowCountCache.RowCountKey; +import org.apache.doris.statistics.BasicAsyncCacheLoader; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +public class ExternalRowCountCacheTest { + private ExternalRowCountCache cache; + private ExecutorService executorService; + + public static class TestLoader extends BasicAsyncCacheLoader<RowCountKey, Optional<Long>> { + + private AtomicLong incr = new AtomicLong(333); + + @Override + protected Optional<Long> doLoad(RowCountKey rowCountKey) { + if (rowCountKey.getTableId() == 1) { + return Optional.of(111L); + } else if (rowCountKey.getTableId() == 2) { + return Optional.of(222L); + } else if (rowCountKey.getTableId() == 3) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("load: " + incr.get()); + return Optional.of(incr.incrementAndGet()); + } + return Optional.empty(); + } + } + + @BeforeEach + public void setUp() { + executorService = Executors.newFixedThreadPool(2); + cache = new ExternalRowCountCache(executorService, 2, new TestLoader()); + } + + @Test + public void test() throws Exception { + // table 1 + long rowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(0, rowCount); + Thread.sleep(1000); + rowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(111, rowCount); + + // table 2 + rowCount = cache.getCachedRowCount(1, 1, 2); + Assertions.assertEquals(0, rowCount); + Thread.sleep(1000); + rowCount = cache.getCachedRowCount(1, 1, 2); + Assertions.assertEquals(222, rowCount); + + // table 3 + rowCount = cache.getCachedRowCount(1, 1, 3); + // first get, it should be 0 because the loader is async + Assertions.assertEquals(0, rowCount); + // After sleep 2 sec and then get, it should be 1 + Thread.sleep(2000); + rowCount = cache.getCachedRowCount(1, 1, 3); + Assertions.assertEquals(334, rowCount); + // sleep 3 sec to trigger refresh + Thread.sleep(3000); + rowCount = cache.getCachedRowCount(1, 1, 3); + // the refresh will be triggered only when query it, so it should still be 1 + Assertions.assertEquals(334, rowCount); + // sleep 2 sec to wait for the doLoad + Thread.sleep(2000); + rowCount = cache.getCachedRowCount(1, 1, 3); + // refresh done, value should be 2 + Assertions.assertEquals(335, rowCount); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org