This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1-lakehouse
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4a721e445b97347fd3c0d934434dc8b6d6a0c84c
Author: Qi Chen <che...@selectdb.com>
AuthorDate: Wed Feb 12 22:31:45 2025 +0800

    [Opt](multi-catalog)Improve performance by introducing cache of list 
directory files when getting split for each query. (#43913)
---
 .../main/java/org/apache/doris/common/Config.java  |   4 +
 fe/fe-core/pom.xml                                 |   5 +
 .../java/org/apache/doris/common/EmptyCache.java   | 247 +++++++
 .../org/apache/doris/common/EvictableCache.java    | 466 ++++++++++++++
 .../apache/doris/common/EvictableCacheBuilder.java | 286 +++++++++
 .../doris/datasource/hive/HMSExternalTable.java    |   4 +-
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  68 +-
 .../doris/datasource/hive/source/HiveScanNode.java |  15 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |   5 +-
 .../java/org/apache/doris/fs/DirectoryLister.java  |  29 +
 .../apache/doris/fs/FileSystemDirectoryLister.java |  37 ++
 .../org/apache/doris/fs/FileSystemIOException.java |  65 ++
 .../apache/doris/fs/RemoteFileRemoteIterator.java  |  47 ++
 .../java/org/apache/doris/fs/RemoteIterator.java   |  27 +
 .../org/apache/doris/fs/SimpleRemoteIterator.java  |  45 ++
 .../fs/TransactionDirectoryListingCacheKey.java    |  64 ++
 .../fs/TransactionScopeCachingDirectoryLister.java | 216 +++++++
 ...nsactionScopeCachingDirectoryListerFactory.java |  59 ++
 .../glue/translator/PhysicalPlanTranslator.java    |  19 +-
 .../apache/doris/planner/SingleNodePlanner.java    |  14 +-
 .../apache/doris/common/TestEvictableCache.java    | 708 +++++++++++++++++++++
 ...TransactionScopeCachingDirectoryListerTest.java | 174 +++++
 22 files changed, 2568 insertions(+), 36 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a367c13c75b..d4a636a7a98 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2098,6 +2098,10 @@ public class Config extends ConfigBase {
         "Max cache number of external table row count"})
     public static long max_external_table_row_count_cache_num = 100000;
 
+    @ConfField(description = {"每个查询的外表文件元数据缓存的最大文件数量。",
+            "Max cache file number of external table split file meta cache at 
query level."})
+    public static long max_external_table_split_file_meta_cache_num = 100000;
+
     /**
      * Max cache loader thread-pool size.
      * Max thread pool size for loading external meta cache
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 049044f62d7..6195a1e32cd 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -783,6 +783,11 @@ under the License.
             <artifactId>ap-loader-all</artifactId>
             <version>3.0-8</version>
         </dependency>
+        <dependency>
+            <groupId>org.gaul</groupId>
+            <artifactId>modernizer-maven-annotations</artifactId>
+            <version>2.4.0</version>
+        </dependency>
     </dependencies>
     <repositories>
         <!-- for huawei obs sdk -->
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java
new file mode 100644
index 00000000000..5942eb2b1f3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EmptyCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import com.google.common.cache.AbstractLoadingCache;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
+import com.google.common.cache.CacheStats;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+
+class EmptyCache<K, V>
+        extends AbstractLoadingCache<K, V> {
+    private final CacheLoader<? super K, V> loader;
+    private final StatsCounter statsCounter;
+
+    EmptyCache(CacheLoader<? super K, V> loader, boolean recordStats) {
+        this.loader = Objects.requireNonNull(loader, "loader is null");
+        this.statsCounter = recordStats ? new SimpleStatsCounter() : new 
NoopStatsCounter();
+    }
+
+    @Override
+    public V getIfPresent(Object key) {
+        statsCounter.recordMisses(1);
+        return null;
+    }
+
+    @Override
+    public V get(K key)
+            throws ExecutionException {
+        return get(key, () -> loader.load(key));
+    }
+
+    @Override
+    public ImmutableMap<K, V> getAll(Iterable<? extends K> keys)
+            throws ExecutionException {
+        try {
+            Set<K> keySet = ImmutableSet.copyOf(keys);
+            statsCounter.recordMisses(keySet.size());
+            @SuppressWarnings("unchecked") // safe since all keys extend K
+            ImmutableMap<K, V> result = (ImmutableMap<K, V>) 
loader.loadAll(keySet);
+            for (K key : keySet) {
+                if (!result.containsKey(key)) {
+                    throw new InvalidCacheLoadException("loadAll failed to 
return a value for " + key);
+                }
+            }
+            statsCounter.recordLoadSuccess(1);
+            return result;
+        } catch (RuntimeException e) {
+            statsCounter.recordLoadException(1);
+            throw new UncheckedExecutionException(e);
+        } catch (Exception e) {
+            statsCounter.recordLoadException(1);
+            throw new ExecutionException(e);
+        }
+    }
+
+    @Override
+    public V get(K key, Callable<? extends V> valueLoader)
+            throws ExecutionException {
+        statsCounter.recordMisses(1);
+        try {
+            V value = valueLoader.call();
+            statsCounter.recordLoadSuccess(1);
+            return value;
+        } catch (RuntimeException e) {
+            statsCounter.recordLoadException(1);
+            throw new UncheckedExecutionException(e);
+        } catch (Exception e) {
+            statsCounter.recordLoadException(1);
+            throw new ExecutionException(e);
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        // Cache, even if configured to evict everything immediately, should 
allow writes.
+    }
+
+    @Override
+    public void refresh(K key) {}
+
+    @Override
+    public void invalidate(Object key) {}
+
+    @Override
+    public void invalidateAll(Iterable<?> keys) {}
+
+    @Override
+    public void invalidateAll() {
+
+    }
+
+    @Override
+    public long size() {
+        return 0;
+    }
+
+    @Override
+    public CacheStats stats() {
+        return statsCounter.snapshot();
+    }
+
+    @Override
+    public ConcurrentMap<K, V> asMap() {
+        return new ConcurrentMap<K, V>() {
+            @Override
+            public V putIfAbsent(K key, V value) {
+                // Cache, even if configured to evict everything immediately, 
should allow writes.
+                // putIfAbsent returns the previous value
+                return null;
+            }
+
+            @Override
+            public boolean remove(Object key, Object value) {
+                return false;
+            }
+
+            @Override
+            public boolean replace(K key, V oldValue, V newValue) {
+                return false;
+            }
+
+            @Override
+            public V replace(K key, V value) {
+                return null;
+            }
+
+            @Override
+            public int size() {
+                return 0;
+            }
+
+            @Override
+            public boolean isEmpty() {
+                return true;
+            }
+
+            @Override
+            public boolean containsKey(Object key) {
+                return false;
+            }
+
+            @Override
+            public boolean containsValue(Object value) {
+                return false;
+            }
+
+            @Override
+            @Nullable
+            public V get(Object key) {
+                return null;
+            }
+
+            @Override
+            @Nullable
+            public V put(K key, V value) {
+                // Cache, even if configured to evict everything immediately, 
should allow writes.
+                return null;
+            }
+
+            @Override
+            @Nullable
+            public V remove(Object key) {
+                return null;
+            }
+
+            @Override
+            public void putAll(Map<? extends K, ? extends V> m) {
+                // Cache, even if configured to evict everything immediately, 
should allow writes.
+            }
+
+            @Override
+            public void clear() {
+
+            }
+
+            @Override
+            public Set<K> keySet() {
+                return ImmutableSet.of();
+            }
+
+            @Override
+            public Collection<V> values() {
+                return ImmutableSet.of();
+            }
+
+            @Override
+            public Set<Entry<K, V>> entrySet() {
+                return ImmutableSet.of();
+            }
+        };
+    }
+
+    private static class NoopStatsCounter
+            implements StatsCounter {
+        private static final CacheStats EMPTY_STATS = new 
SimpleStatsCounter().snapshot();
+
+        @Override
+        public void recordHits(int count) {}
+
+        @Override
+        public void recordMisses(int count) {}
+
+        @Override
+        public void recordLoadSuccess(long loadTime) {}
+
+        @Override
+        public void recordLoadException(long loadTime) {}
+
+        @Override
+        public void recordEviction() {}
+
+        @Override
+        public CacheStats snapshot() {
+            return EMPTY_STATS;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java
new file mode 100644
index 00000000000..a2cb05d82c5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Verify;
+import com.google.common.cache.AbstractLoadingCache;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalCause;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import javax.annotation.Nullable;
+
+/**
+ * A {@link Cache} and {@link LoadingCache} implementation similar to ones
+ * produced by {@link CacheBuilder#build()}, but one that does not
+ * exhibit <a href="https://github.com/google/guava/issues/1881";>Guava issue 
#1881</a>:
+ * a cache inspection with {@link #getIfPresent(Object)} or {@link 
#get(Object, Callable)}
+ * is guaranteed to return fresh state after {@link #invalidate(Object)},
+ * {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called.
+ *
+ * @see EvictableCacheBuilder
+ */
+// @ElementTypesAreNonnullByDefault
+class EvictableCache<K, V>
+        extends AbstractLoadingCache<K, V>
+        implements LoadingCache<K, V> {
+    // Invariant: for every (K, token) entry in the tokens map, there is a live
+    // cache entry (token, ?) in dataCache, that, upon eviction, will cause 
the tokens'
+    // entry to be removed.
+    private final ConcurrentHashMap<K, Token<K>> tokens = new 
ConcurrentHashMap<>();
+    // The dataCache can have entries with no corresponding tokens in the 
tokens map.
+    // For example, this can happen when invalidation concurs with load.
+    // The dataCache must be bounded.
+    private final LoadingCache<Token<K>, V> dataCache;
+
+    private final AtomicInteger invalidations = new AtomicInteger();
+
+    EvictableCache(CacheBuilder<? super Token<K>, ? super V> cacheBuilder, 
CacheLoader<? super K, V> cacheLoader) {
+        dataCache = buildUnsafeCache(
+                cacheBuilder
+                        .<Token<K>, V>removalListener(removal -> {
+                            Token<K> token = removal.getKey();
+                            Verify.verify(token != null, "token is null");
+                            if (removal.getCause() != RemovalCause.REPLACED) {
+                                tokens.remove(token.getKey(), token);
+                            }
+                        }),
+                new TokenCacheLoader<>(cacheLoader));
+    }
+
+    // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden,
+    // advising to use this class as a safety-adding wrapper.
+    private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? 
super K, ? super V> cacheBuilder,
+            CacheLoader<? super K, V> cacheLoader) {
+        return cacheBuilder.build(cacheLoader);
+    }
+
+    @Override
+    public V getIfPresent(Object key) {
+        @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as 
key K
+        Token<K> token = tokens.get(key);
+        if (token == null) {
+            return null;
+        }
+        return dataCache.getIfPresent(token);
+    }
+
+    @Override
+    public V get(K key, Callable<? extends V> valueLoader)
+            throws ExecutionException {
+        Token<K> newToken = new Token<>(key);
+        int invalidations = this.invalidations.get();
+        Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken);
+        try {
+            V value = dataCache.get(token, valueLoader);
+            if (invalidations == this.invalidations.get()) {
+                // Revive token if it got expired before reloading
+                if (tokens.putIfAbsent(key, token) == null) {
+                    // Revived
+                    if (!dataCache.asMap().containsKey(token)) {
+                        // We revived, but the token does not correspond to a 
live entry anymore.
+                        // It would stay in tokens forever, so let's remove it.
+                        tokens.remove(key, token);
+                    }
+                }
+            }
+            return value;
+        } catch (Throwable e) {
+            if (newToken == token) {
+                // Failed to load and it was our new token persisted in tokens 
map.
+                // No cache entry exists for the token (unless concurrent load 
happened),
+                // so we need to remove it.
+                tokens.remove(key, newToken);
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public V get(K key)
+            throws ExecutionException {
+        Token<K> newToken = new Token<>(key);
+        int invalidations = this.invalidations.get();
+        Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken);
+        try {
+            V value = dataCache.get(token);
+            if (invalidations == this.invalidations.get()) {
+                // Revive token if it got expired before reloading
+                if (tokens.putIfAbsent(key, token) == null) {
+                    // Revived
+                    if (!dataCache.asMap().containsKey(token)) {
+                        // We revived, but the token does not correspond to a 
live entry anymore.
+                        // It would stay in tokens forever, so let's remove it.
+                        tokens.remove(key, token);
+                    }
+                }
+            }
+            return value;
+        } catch (Throwable e) {
+            if (newToken == token) {
+                // Failed to load and it was our new token persisted in tokens 
map.
+                // No cache entry exists for the token (unless concurrent load 
happened),
+                // so we need to remove it.
+                tokens.remove(key, newToken);
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public ImmutableMap<K, V> getAll(Iterable<? extends K> keys)
+            throws ExecutionException {
+        List<Token<K>> newTokens = new ArrayList<>();
+        List<Token<K>> temporaryTokens = new ArrayList<>();
+        try {
+            Map<K, V> result = new LinkedHashMap<>();
+            for (K key : keys) {
+                if (result.containsKey(key)) {
+                    continue;
+                }
+                // This is not bulk, but is fast local operation
+                Token<K> newToken = new Token<>(key);
+                Token<K> oldToken = tokens.putIfAbsent(key, newToken);
+                if (oldToken != null) {
+                    // Token exists but a data may not exist (e.g. due to 
concurrent eviction)
+                    V value = dataCache.getIfPresent(oldToken);
+                    if (value != null) {
+                        result.put(key, value);
+                        continue;
+                    }
+                    // Old token exists but value wasn't found. This can 
happen when there is concurrent
+                    // eviction/invalidation or when the value is still being 
loaded.
+                    // The new token is not registered in tokens, so won't be 
used by subsequent invocations.
+                    temporaryTokens.add(newToken);
+                }
+                newTokens.add(newToken);
+            }
+
+            Map<Token<K>, V> values = dataCache.getAll(newTokens);
+            for (Map.Entry<Token<K>, V> entry : values.entrySet()) {
+                Token<K> newToken = entry.getKey();
+                result.put(newToken.getKey(), entry.getValue());
+            }
+            return ImmutableMap.copyOf(result);
+        } catch (Throwable e) {
+            for (Token<K> token : newTokens) {
+                // Failed to load and it was our new token (potentially) 
persisted in tokens map.
+                // No cache entry exists for the token (unless concurrent load 
happened),
+                // so we need to remove it.
+                tokens.remove(token.getKey(), token);
+            }
+            throw e;
+        } finally {
+            dataCache.invalidateAll(temporaryTokens);
+        }
+    }
+
+    @Override
+    public void refresh(K key) {
+        // The refresh loads a new entry, if it wasn't in the cache yet. Thus, 
we would create a new Token.
+        // However, dataCache.refresh is asynchronous and may fail, so no 
cache entry may be created.
+        // In such case we would leak the newly created token.
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long size() {
+        return dataCache.size();
+    }
+
+    @Override
+    public void cleanUp() {
+        dataCache.cleanUp();
+    }
+
+    @VisibleForTesting
+    int tokensCount() {
+        return tokens.size();
+    }
+
+    @Override
+    public void invalidate(Object key) {
+        invalidations.incrementAndGet();
+        @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as 
key K
+        Token<K> token = tokens.remove(key);
+        if (token != null) {
+            dataCache.invalidate(token);
+        }
+    }
+
+    @Override
+    public void invalidateAll() {
+        invalidations.incrementAndGet();
+        dataCache.invalidateAll();
+        tokens.clear();
+    }
+
+    // Not thread safe, test only.
+    @VisibleForTesting
+    void clearDataCacheOnly() {
+        Map<K, Token<K>> tokensCopy = new HashMap<>(tokens);
+        dataCache.asMap().clear();
+        Verify.verify(tokens.isEmpty(), "Clearing dataCache should trigger 
tokens eviction");
+        tokens.putAll(tokensCopy);
+    }
+
+    @Override
+    public CacheStats stats() {
+        return dataCache.stats();
+    }
+
+    @Override
+    public ConcurrentMap<K, V> asMap() {
+        return new ConcurrentMap<K, V>() {
+            private final ConcurrentMap<Token<K>, V> dataCacheMap = 
dataCache.asMap();
+
+            @Override
+            public V putIfAbsent(K key, V value) {
+                throw new UnsupportedOperationException("The operation is not 
supported,"
+                        + " as in inherently races with cache invalidation");
+            }
+
+            @Override
+            public V compute(K key, BiFunction<? super K, ? super V, ? extends 
V> remappingFunction) {
+                // default implementation of ConcurrentMap#compute uses not 
supported putIfAbsent in some cases
+                throw new UnsupportedOperationException("The operation is not 
supported, as in inherently"
+                        + " races with cache invalidation");
+            }
+
+            @Override
+            public boolean remove(Object key, Object value) {
+                @SuppressWarnings("SuspiciousMethodCalls") // Object passed to 
map as key K
+                Token<K> token = tokens.get(key);
+                if (token != null) {
+                    return dataCacheMap.remove(token, value);
+                }
+                return false;
+            }
+
+            @Override
+            public boolean replace(K key, V oldValue, V newValue) {
+                Token<K> token = tokens.get(key);
+                if (token != null) {
+                    return dataCacheMap.replace(token, oldValue, newValue);
+                }
+                return false;
+            }
+
+            @Override
+            public V replace(K key, V value) {
+                throw new UnsupportedOperationException("The operation is not 
supported, as in inherently races"
+                        + " with cache invalidation");
+            }
+
+            @Override
+            public int size() {
+                return dataCache.asMap().size();
+            }
+
+            @Override
+            public boolean isEmpty() {
+                return dataCache.asMap().isEmpty();
+            }
+
+            @Override
+            public boolean containsKey(Object key) {
+                return tokens.containsKey(key);
+            }
+
+            @Override
+            public boolean containsValue(Object value) {
+                return values().contains(value);
+            }
+
+            @Override
+            @Nullable
+            public V get(Object key) {
+                return getIfPresent(key);
+            }
+
+            @Override
+            public V put(K key, V value) {
+                throw new UnsupportedOperationException("The operation is not 
supported, as in inherently"
+                        + " races with cache invalidation. Use get(key, 
callable) instead.");
+            }
+
+            @Override
+            @Nullable
+            public V remove(Object key) {
+                Token<K> token = tokens.remove(key);
+                if (token != null) {
+                    return dataCacheMap.remove(token);
+                }
+                return null;
+            }
+
+            @Override
+            public void putAll(Map<? extends K, ? extends V> m) {
+                throw new UnsupportedOperationException("The operation is not 
supported, as in inherently"
+                        + " races with cache invalidation. Use get(key, 
callable) instead.");
+            }
+
+            @Override
+            public void clear() {
+                dataCacheMap.clear();
+                tokens.clear();
+            }
+
+            @Override
+            public Set<K> keySet() {
+                return tokens.keySet();
+            }
+
+            @Override
+            public Collection<V> values() {
+                return dataCacheMap.values();
+            }
+
+            @Override
+            public Set<Map.Entry<K, V>> entrySet() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    // instance-based equality
+    static final class Token<K> {
+        private final K key;
+
+        Token(K key) {
+            this.key = Objects.requireNonNull(key, "key is null");
+        }
+
+        K getKey() {
+            return key;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("CacheToken(%s; %s)", 
Integer.toHexString(hashCode()), key);
+        }
+    }
+
+    private static class TokenCacheLoader<K, V>
+            extends CacheLoader<Token<K>, V> {
+        private final CacheLoader<? super K, V> delegate;
+
+        public TokenCacheLoader(CacheLoader<? super K, V> delegate) {
+            this.delegate = Objects.requireNonNull(delegate, "delegate is 
null");
+        }
+
+        @Override
+        public V load(Token<K> token)
+                throws Exception {
+            return delegate.load(token.getKey());
+        }
+
+        @Override
+        public ListenableFuture<V> reload(Token<K> token, V oldValue)
+                throws Exception {
+            return delegate.reload(token.getKey(), oldValue);
+        }
+
+        @Override
+        public Map<Token<K>, V> loadAll(Iterable<? extends Token<K>> tokens)
+                throws Exception {
+            List<Token<K>> tokenList = ImmutableList.copyOf(tokens);
+            List<K> keys = new ArrayList<>();
+            for (Token<K> token : tokenList) {
+                keys.add(token.getKey());
+            }
+            Map<? super K, V> values;
+            try {
+                values = delegate.loadAll(keys);
+            } catch (UnsupportedLoadingOperationException e) {
+                // Guava uses UnsupportedLoadingOperationException in 
LoadingCache.loadAll
+                // to fall back from bulk loading (without load sharing) to 
loading individual
+                // values (with load sharing). EvictableCache implementation 
does not currently
+                // support the fallback mechanism, so the individual values 
would be loaded
+                // without load sharing. This would be an unintentional and 
non-obvious behavioral
+                // discrepancy between EvictableCache and Guava Caches, so the 
mechanism is disabled.
+                throw new UnsupportedOperationException("LoadingCache.getAll() 
is not supported by EvictableCache"
+                        + " when CacheLoader.loadAll is not implemented", e);
+            }
+
+            ImmutableMap.Builder<Token<K>, V> result = ImmutableMap.builder();
+            for (int i = 0; i < tokenList.size(); i++) {
+                Token<K> token = tokenList.get(i);
+                K key = keys.get(i);
+                V value = values.get(key);
+                // CacheLoader.loadAll is not guaranteed to return values for 
all the keys
+                if (value != null) {
+                    result.put(token, value);
+                }
+            }
+            return result.buildOrThrow();
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .addValue(delegate)
+                    .toString();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java
new file mode 100644
index 00000000000..8da3f8c8d56
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCacheBuilder.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import org.apache.doris.common.EvictableCache.Token;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import com.google.errorprone.annotations.CheckReturnValue;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Builder for {@link Cache} and {@link LoadingCache} instances, similar to 
{@link CacheBuilder},
+ * but creating cache implementations that do not exhibit
+ * <a href="https://github.com/google/guava/issues/1881";>Guava issue #1881</a>:
+ * a cache inspection with {@link Cache#getIfPresent(Object)} or {@link 
Cache#get(Object, Callable)}
+ * is guaranteed to return fresh state after {@link Cache#invalidate(Object)},
+ * {@link Cache#invalidateAll(Iterable)} or {@link Cache#invalidateAll()} were 
called.
+ */
+public final class EvictableCacheBuilder<K, V> {
+    @CheckReturnValue
+    public static EvictableCacheBuilder<Object, Object> newBuilder() {
+        return new EvictableCacheBuilder<>();
+    }
+
+    private Optional<Ticker> ticker = Optional.empty();
+    private Optional<Duration> expireAfterWrite = Optional.empty();
+    private Optional<Duration> refreshAfterWrite = Optional.empty();
+    private Optional<Long> maximumSize = Optional.empty();
+    private Optional<Long> maximumWeight = Optional.empty();
+    private Optional<Integer> concurrencyLevel = Optional.empty();
+    private Optional<Weigher<? super Token<K>, ? super V>> weigher = 
Optional.empty();
+    private boolean recordStats;
+    private Optional<DisabledCacheImplementation> disabledCacheImplementation 
= Optional.empty();
+
+    private EvictableCacheBuilder() {}
+
+    /**
+     * Pass-through for {@link CacheBuilder#ticker(Ticker)}.
+     */
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> ticker(Ticker ticker) {
+        this.ticker = Optional.of(ticker);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> expireAfterWrite(long duration, 
TimeUnit unit) {
+        return expireAfterWrite(toDuration(duration, unit));
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> expireAfterWrite(Duration duration) {
+        Preconditions.checkState(!this.expireAfterWrite.isPresent(), 
"expireAfterWrite already set");
+        this.expireAfterWrite = Optional.of(duration);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> refreshAfterWrite(long duration, 
TimeUnit unit) {
+        return refreshAfterWrite(toDuration(duration, unit));
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> refreshAfterWrite(Duration duration) {
+        Preconditions.checkState(!this.refreshAfterWrite.isPresent(), 
"refreshAfterWrite already set");
+        this.refreshAfterWrite = Optional.of(duration);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> maximumSize(long maximumSize) {
+        Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize 
already set");
+        Preconditions.checkState(!this.maximumWeight.isPresent(), 
"maximumWeight already set");
+        this.maximumSize = Optional.of(maximumSize);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> maximumWeight(long maximumWeight) {
+        Preconditions.checkState(!this.maximumWeight.isPresent(), 
"maximumWeight already set");
+        Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize 
already set");
+        this.maximumWeight = Optional.of(maximumWeight);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> concurrencyLevel(int concurrencyLevel) {
+        Preconditions.checkState(!this.concurrencyLevel.isPresent(), 
"concurrencyLevel already set");
+        this.concurrencyLevel = Optional.of(concurrencyLevel);
+        return this;
+    }
+
+    public <K1 extends K, V1 extends V> EvictableCacheBuilder<K1, V1> 
weigher(Weigher<? super K1, ? super V1> weigher) {
+        Preconditions.checkState(!this.weigher.isPresent(), "weigher already 
set");
+        @SuppressWarnings("unchecked") // see 
com.google.common.cache.CacheBuilder.weigher
+        EvictableCacheBuilder<K1, V1> cast = (EvictableCacheBuilder<K1, V1>) 
this;
+        cast.weigher = Optional.of(new TokenWeigher<>(weigher));
+        return cast;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> recordStats() {
+        recordStats = true;
+        return this;
+    }
+
+    /**
+     * Choose a behavior for case when caching is disabled that may allow data 
and failure
+     * sharing between concurrent callers.
+     */
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> shareResultsAndFailuresEvenIfDisabled() 
{
+        return disabledCacheImplementation(DisabledCacheImplementation.GUAVA);
+    }
+
+    /**
+     * Choose a behavior for case when caching is disabled that prevents data 
and
+     * failure sharing between concurrent callers.
+     * Note: disabled cache won't report any statistics.
+     */
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> shareNothingWhenDisabled() {
+        return disabledCacheImplementation(DisabledCacheImplementation.NOOP);
+    }
+
+    @VisibleForTesting
+    EvictableCacheBuilder<K, V> 
disabledCacheImplementation(DisabledCacheImplementation cacheImplementation) {
+        Preconditions.checkState(!disabledCacheImplementation.isPresent(), 
"disabledCacheImplementation already set");
+        disabledCacheImplementation = Optional.of(cacheImplementation);
+        return this;
+    }
+
+    @CheckReturnValue
+    public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
+        return build(unimplementedCacheLoader());
+    }
+
+    @CheckReturnValue
+    public <K1 extends K, V1 extends V> LoadingCache<K1, V1> 
build(CacheLoader<? super K1, V1> loader) {
+        if (cacheDisabled()) {
+            // Silently providing a behavior different from Guava's could be 
surprising, so require explicit choice.
+            DisabledCacheImplementation disabledCacheImplementation = 
this.disabledCacheImplementation.orElseThrow(
+                    () -> new IllegalStateException(
+                    "Even when cache is disabled, the loads are synchronized 
and both load results and failures"
+                            + " are shared between threads. " + "This is 
rarely desired, thus builder caller is"
+                            + " expected to either opt-in into this behavior 
with"
+                            + " shareResultsAndFailuresEvenIfDisabled(), or 
choose not to share results (and failures)"
+                            + " between concurrent invocations with 
shareNothingWhenDisabled()."));
+
+            switch (disabledCacheImplementation) {
+                case NOOP:
+                    return new EmptyCache<>(loader, recordStats);
+                case GUAVA: {
+                    // Disabled cache is always empty, so doesn't exhibit 
invalidation problems.
+                    // Avoid overhead of EvictableCache wrapper.
+                    CacheBuilder<Object, Object> cacheBuilder = 
CacheBuilder.newBuilder()
+                            .maximumSize(0)
+                            .expireAfterWrite(0, TimeUnit.SECONDS);
+                    if (recordStats) {
+                        cacheBuilder.recordStats();
+                    }
+                    return buildUnsafeCache(cacheBuilder, loader);
+                }
+                default:
+                    throw new IllegalStateException("Unexpected value: " + 
disabledCacheImplementation);
+            }
+        }
+
+        if (!(maximumSize.isPresent() || maximumWeight.isPresent() || 
expireAfterWrite.isPresent())) {
+            // EvictableCache invalidation (e.g. invalidateAll) happening 
concurrently with a load may
+            // lead to an entry remaining in the cache, without associated 
token. This would lead to
+            // a memory leak in an unbounded cache.
+            throw new IllegalStateException("Unbounded cache is not 
supported");
+        }
+
+        // CacheBuilder is further modified in EvictableCache::new, so cannot 
be shared between build() calls.
+        CacheBuilder<Object, ? super V> cacheBuilder = 
CacheBuilder.newBuilder();
+        ticker.ifPresent(cacheBuilder::ticker);
+        expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite);
+        refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite);
+        maximumSize.ifPresent(cacheBuilder::maximumSize);
+        maximumWeight.ifPresent(cacheBuilder::maximumWeight);
+        weigher.ifPresent(cacheBuilder::weigher);
+        concurrencyLevel.ifPresent(cacheBuilder::concurrencyLevel);
+        if (recordStats) {
+            cacheBuilder.recordStats();
+        }
+        return new EvictableCache<>(cacheBuilder, loader);
+    }
+
+    private boolean cacheDisabled() {
+        return (maximumSize.isPresent() && maximumSize.get() == 0)
+                || (expireAfterWrite.isPresent() && 
expireAfterWrite.get().isZero());
+    }
+
+    // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden,
+    // advising to use this class as a safety-adding wrapper.
+    private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? 
super K, ? super V> cacheBuilder,
+            CacheLoader<? super K, V> cacheLoader) {
+        return cacheBuilder.build(cacheLoader);
+    }
+
+    private static <K, V> CacheLoader<K, V> unimplementedCacheLoader() {
+        return CacheLoader.from(ignored -> {
+            throw new UnsupportedOperationException();
+        });
+    }
+
+    private static final class TokenWeigher<K, V>
+            implements Weigher<Token<K>, V> {
+        private final Weigher<? super K, ? super V> delegate;
+
+        private TokenWeigher(Weigher<? super K, ? super V> delegate) {
+            this.delegate = Objects.requireNonNull(delegate, "delegate is 
null");
+        }
+
+        @Override
+        public int weigh(Token<K> key, V value) {
+            return delegate.weigh(key.getKey(), value);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TokenWeigher<?, ?> that = (TokenWeigher<?, ?>) o;
+            return Objects.equals(delegate, that.delegate);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(delegate);
+        }
+
+        @Override
+        public String toString() {
+            return "TokenWeigher{" + "delegate=" + delegate + '}';
+        }
+    }
+
+    private static Duration toDuration(long duration, TimeUnit unit) {
+        // Saturated conversion, as in 
com.google.common.cache.CacheBuilder.toNanosSaturated
+        return Duration.ofNanos(unit.toNanos(duration));
+    }
+
+    @VisibleForTesting
+    enum DisabledCacheImplementation {
+        NOOP,
+        GUAVA,
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index d07f0502d10..b056af3ebd6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -39,6 +39,7 @@ import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
 import org.apache.doris.datasource.hudi.HudiUtils;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.fs.FileSystemDirectoryLister;
 import org.apache.doris.mtmv.MTMVBaseTableIf;
 import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
 import org.apache.doris.mtmv.MTMVRefreshContext;
@@ -1016,7 +1017,8 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
                 LOG.debug("Chosen partition for table {}. [{}]", name, 
partition.toString());
             }
         }
-        return cache.getFilesByPartitionsWithoutCache(hivePartitions, 
bindBrokerName);
+        return cache.getFilesByPartitionsWithoutCache(hivePartitions, 
bindBrokerName,
+                new FileSystemDirectoryLister(), null);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 838005b47a9..3815a356585 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CacheFactory;
@@ -43,7 +44,11 @@ import 
org.apache.doris.datasource.hive.HiveUtil.ACIDFileFilter;
 import org.apache.doris.datasource.hive.HiveUtil.FullAcidFileFilter;
 import org.apache.doris.datasource.hive.HiveUtil.InsertOnlyACIDFileFilter;
 import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.fs.DirectoryLister;
 import org.apache.doris.fs.FileSystemCache;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.FileSystemIOException;
+import org.apache.doris.fs.RemoteIterator;
 import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
@@ -194,7 +199,7 @@ public class HiveMetaStoreCache {
 
             @Override
             public FileCacheValue load(FileCacheKey key) {
-                return loadFiles(key);
+                return loadFiles(key, new FileSystemDirectoryLister(), null);
             }
         };
 
@@ -347,7 +352,9 @@ public class HiveMetaStoreCache {
     private FileCacheValue getFileCache(String location, String inputFormat,
             JobConf jobConf,
             List<String> partitionValues,
-            String bindBrokerName) throws UserException {
+            String bindBrokerName,
+            DirectoryLister directoryLister,
+            TableIf table) throws UserException {
         FileCacheValue result = new FileCacheValue();
         RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                 new 
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
@@ -362,34 +369,37 @@ public class HiveMetaStoreCache {
         //      /user/hive/warehouse/region_tmp_union_all2/2
         // So we need to recursively list data location.
         // 
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
-        List<RemoteFile> remoteFiles = new ArrayList<>();
         boolean isRecursiveDirectories = Boolean.valueOf(
                 
catalog.getProperties().getOrDefault("hive.recursive_directories", "false"));
-        Status status = fs.listFiles(location, isRecursiveDirectories, 
remoteFiles);
-        if (status.ok()) {
-            for (RemoteFile remoteFile : remoteFiles) {
+        try {
+            RemoteIterator<RemoteFile> iterator = 
directoryLister.listFiles(fs, isRecursiveDirectories,
+                    table, location);
+            while (iterator.hasNext()) {
+                RemoteFile remoteFile = iterator.next();
                 String srcPath = remoteFile.getPath().toString();
                 LocationPath locationPath = new LocationPath(srcPath, 
catalog.getProperties());
                 result.addFile(remoteFile, locationPath);
             }
-        } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
-            // User may manually remove partition under HDFS, in this case,
-            // Hive doesn't aware that the removed partition is missing.
-            // Here is to support this case without throw an exception.
-            LOG.warn(String.format("File %s not exist.", location));
-            if (!Boolean.valueOf(catalog.getProperties()
-                    .getOrDefault("hive.ignore_absent_partitions", "true"))) {
-                throw new UserException("Partition location does not exist: " 
+ location);
+        } catch (FileSystemIOException e) {
+            if (e.getErrorCode().isPresent() && 
e.getErrorCode().get().equals(ErrCode.NOT_FOUND)) {
+                // User may manually remove partition under HDFS, in this case,
+                // Hive doesn't aware that the removed partition is missing.
+                // Here is to support this case without throw an exception.
+                LOG.warn(String.format("File %s not exist.", location));
+                if (!Boolean.valueOf(catalog.getProperties()
+                        .getOrDefault("hive.ignore_absent_partitions", 
"true"))) {
+                    throw new UserException("Partition location does not 
exist: " + location);
+                }
+            } else {
+                throw new RuntimeException(e);
             }
-        } else {
-            throw new RuntimeException(status.getErrMsg());
         }
         // Must copy the partitionValues to avoid concurrent modification of 
key and value
         result.setPartitionValues(Lists.newArrayList(partitionValues));
         return result;
     }
 
-    private FileCacheValue loadFiles(FileCacheKey key) {
+    private FileCacheValue loadFiles(FileCacheKey key, DirectoryLister 
directoryLister, TableIf table) {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
@@ -413,7 +423,7 @@ public class HiveMetaStoreCache {
             FileInputFormat.setInputPaths(jobConf, finalLocation.get());
             try {
                 FileCacheValue result = getFileCache(finalLocation.get(), 
key.inputFormat, jobConf,
-                        key.getPartitionValues(), key.bindBrokerName);
+                        key.getPartitionValues(), key.bindBrokerName, 
directoryLister, table);
                 // Replace default hive partition with a null_string.
                 for (int i = 0; i < result.getValuesSize(); i++) {
                     if 
(HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) {
@@ -465,19 +475,25 @@ public class HiveMetaStoreCache {
     }
 
     public List<FileCacheValue> 
getFilesByPartitionsWithCache(List<HivePartition> partitions,
-                                                              String 
bindBrokerName) {
-        return getFilesByPartitions(partitions, true, true, bindBrokerName);
+                                                              String 
bindBrokerName,
+                                                              DirectoryLister 
directoryLister,
+                                                              TableIf table) {
+        return getFilesByPartitions(partitions, true, true, bindBrokerName, 
directoryLister, table);
     }
 
     public List<FileCacheValue> 
getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
-                                                                 String 
bindBrokerName) {
-        return getFilesByPartitions(partitions, false, true, bindBrokerName);
+                                                                 String 
bindBrokerName,
+                                                                 
DirectoryLister directoryLister,
+                                                                 TableIf 
table) {
+        return getFilesByPartitions(partitions, false, true, bindBrokerName, 
directoryLister, table);
     }
 
     public List<FileCacheValue> getFilesByPartitions(List<HivePartition> 
partitions,
                                                      boolean withCache,
                                                      boolean concurrent,
-                                                     String bindBrokerName) {
+                                                     String bindBrokerName,
+            DirectoryLister directoryLister,
+            TableIf table) {
         long start = System.currentTimeMillis();
         List<FileCacheKey> keys = partitions.stream().map(p -> 
p.isDummyPartition()
                 ? FileCacheKey.createDummyCacheKey(
@@ -493,13 +509,15 @@ public class HiveMetaStoreCache {
             } else {
                 if (concurrent) {
                     List<Future<FileCacheValue>> pList = keys.stream().map(
-                            key -> fileListingExecutor.submit(() -> 
loadFiles(key))).collect(Collectors.toList());
+                            key -> fileListingExecutor.submit(() -> 
loadFiles(key, directoryLister, table)))
+                            .collect(Collectors.toList());
                     fileLists = 
Lists.newArrayListWithExpectedSize(keys.size());
                     for (Future<FileCacheValue> p : pList) {
                         fileLists.add(p.get());
                     }
                 } else {
-                    fileLists = 
keys.stream().map(this::loadFiles).collect(Collectors.toList());
+                    fileLists = keys.stream().map((key) -> loadFiles(key, 
directoryLister, table))
+                            .collect(Collectors.toList());
                 }
             }
         } catch (ExecutionException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index b14dfbf02f4..66d5382c151 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -42,6 +42,7 @@ import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.HiveProperties;
 import org.apache.doris.datasource.hive.HiveTransaction;
 import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
+import org.apache.doris.fs.DirectoryLister;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
@@ -87,6 +88,8 @@ public class HiveScanNode extends FileQueryScanNode {
     @Setter
     protected SelectedPartitions selectedPartitions = null;
 
+    private DirectoryLister directoryLister;
+
     private boolean partitionInit = false;
     private final AtomicReference<UserException> batchException = new 
AtomicReference<>(null);
     private List<HivePartition> prunedPartitions;
@@ -101,15 +104,18 @@ public class HiveScanNode extends FileQueryScanNode {
      * eg: s3 tvf
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
-        this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv, sv);
+    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv,
+            DirectoryLister directoryLister) {
+        this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv, sv, directoryLister);
     }
 
     public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
-            StatisticalType statisticalType, boolean needCheckColumnPriv, 
SessionVariable sv) {
+            StatisticalType statisticalType, boolean needCheckColumnPriv, 
SessionVariable sv,
+            DirectoryLister directoryLister) {
         super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, 
sv);
         hmsTable = (HMSExternalTable) desc.getTable();
         brokerName = hmsTable.getCatalog().bindBrokerName();
+        this.directoryLister = directoryLister;
     }
 
     @Override
@@ -279,7 +285,8 @@ public class HiveScanNode extends FileQueryScanNode {
             }
         } else {
             boolean withCache = Config.max_external_file_cache_num > 0;
-            fileCaches = cache.getFilesByPartitions(partitions, withCache, 
partitions.size() > 1, bindBrokerName);
+            fileCaches = cache.getFilesByPartitions(partitions, withCache, 
partitions.size() > 1, bindBrokerName,
+                    directoryLister, hmsTable);
         }
         if (tableSample != null) {
             List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = 
selectFiles(fileCaches);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index a6156924e27..b8f3d7bd198 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -37,6 +37,7 @@ import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
+import org.apache.doris.fs.DirectoryLister;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
@@ -119,8 +120,8 @@ public class HudiScanNode extends HiveScanNode {
      */
     public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
             Optional<TableScanParams> scanParams, 
Optional<IncrementalRelation> incrementalRelation,
-            SessionVariable sv) {
-        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv, sv);
+            SessionVariable sv, DirectoryLister directoryLister) {
+        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv, sv, directoryLister);
         isCowTable = hmsTable.isHoodieCowTable();
         if (LOG.isDebugEnabled()) {
             if (isCowTable) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java
new file mode 100644
index 00000000000..e97bc2c684f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+public interface DirectoryLister {
+    RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, 
TableIf table, String location)
+            throws FileSystemIOException;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java
new file mode 100644
index 00000000000..dcd7eeb16b3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileSystemDirectoryLister implements DirectoryLister {
+    public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean 
recursive, TableIf table, String location)
+            throws FileSystemIOException {
+        List<RemoteFile> result = new ArrayList<>();
+        Status status = fs.listFiles(location, recursive, result);
+        if (!status.ok()) {
+            throw new FileSystemIOException(status.getErrCode(), 
status.getErrMsg());
+        }
+        return new RemoteFileRemoteIterator(result);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java
new file mode 100644
index 00000000000..c9e45d0352b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.backup.Status.ErrCode;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+public class FileSystemIOException extends IOException {
+
+    @Nullable
+    private ErrCode errCode;
+
+    public FileSystemIOException(ErrCode errCode, String message) {
+        super(message);
+        this.errCode = errCode;
+    }
+
+    public FileSystemIOException(ErrCode errCode, String message, Throwable 
cause) {
+        super(message, cause);
+        this.errCode = errCode;
+    }
+
+    public FileSystemIOException(String message) {
+        super(message);
+        this.errCode = null;
+    }
+
+    public FileSystemIOException(String message, Throwable cause) {
+        super(message, cause);
+        this.errCode = null;
+    }
+
+    public Optional<ErrCode> getErrorCode() {
+        return Optional.ofNullable(errCode);
+    }
+
+    @Override
+    public String getMessage() {
+        if (errCode != null) {
+            return String.format("[%s]: %s",
+                    errCode,
+                    super.getMessage());
+        } else {
+            return super.getMessage();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java
new file mode 100644
index 00000000000..6ac8eb3b0c6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+public class RemoteFileRemoteIterator
+        implements RemoteIterator<RemoteFile> {
+    private final List<RemoteFile> remoteFileList;
+    private int currentIndex = 0;
+
+    public RemoteFileRemoteIterator(List<RemoteFile> remoteFileList) {
+        this.remoteFileList = Objects.requireNonNull(remoteFileList, "iterator 
is null");
+    }
+
+    @Override
+    public boolean hasNext() throws FileSystemIOException {
+        return currentIndex < remoteFileList.size();
+    }
+
+    @Override
+    public RemoteFile next() throws FileSystemIOException {
+        if (!hasNext()) {
+            throw new NoSuchElementException("No more elements in 
RemoteFileRemoteIterator");
+        }
+        return remoteFileList.get(currentIndex++);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java
new file mode 100644
index 00000000000..b9d212a15a5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+public interface RemoteIterator<T> {
+    boolean hasNext() throws FileSystemIOException;
+
+    T next() throws FileSystemIOException;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java
new file mode 100644
index 00000000000..4332a5fed35
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.fs;
+
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.util.Iterator;
+import java.util.Objects;
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java
+// and modified by Doris
+
+class SimpleRemoteIterator implements RemoteIterator<RemoteFile> {
+    private final Iterator<RemoteFile> iterator;
+
+    public SimpleRemoteIterator(Iterator<RemoteFile> iterator) {
+        this.iterator = Objects.requireNonNull(iterator, "iterator is null");
+    }
+
+    @Override
+    public boolean hasNext() throws FileSystemIOException {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public RemoteFile next() throws FileSystemIOException {
+        return iterator.next();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java
new file mode 100644
index 00000000000..6be3c03f824
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import java.util.Objects;
+
+public class TransactionDirectoryListingCacheKey {
+
+    private final long transactionId;
+    private final String path;
+
+    public TransactionDirectoryListingCacheKey(long transactionId, String 
path) {
+        this.transactionId = transactionId;
+        this.path = Objects.requireNonNull(path, "path is null");
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TransactionDirectoryListingCacheKey that = 
(TransactionDirectoryListingCacheKey) o;
+        return transactionId == that.transactionId && path.equals(that.path);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(transactionId, path);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new 
StringBuilder("TransactionDirectoryListingCacheKey{");
+        sb.append("transactionId=").append(transactionId);
+        sb.append(", path='").append(path).append('\'');
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java
new file mode 100644
index 00000000000..37acec6864f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.commons.collections.ListUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+
+/**
+ * Caches directory content (including listings that were started 
concurrently).
+ * {@link TransactionScopeCachingDirectoryLister} assumes that all listings
+ * are performed by same user within single transaction, therefore any failure 
can
+ * be shared between concurrent listings.
+ */
+public class TransactionScopeCachingDirectoryLister implements DirectoryLister 
{
+    private final long transactionId;
+
+    @VisibleForTesting
+    public Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> 
getCache() {
+        return cache;
+    }
+
+    //TODO use a cache key based on Path & SchemaTableName and iterate over 
the cache keys
+    // to deal more efficiently with cache invalidation scenarios for 
partitioned tables.
+    private final Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder> cache;
+    private final DirectoryLister delegate;
+
+    public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, 
long transactionId,
+            Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> 
cache) {
+        this.delegate = Objects.requireNonNull(delegate, "delegate is null");
+        this.transactionId = transactionId;
+        this.cache = Objects.requireNonNull(cache, "cache is null");
+    }
+
+    @Override
+    public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean 
recursive, TableIf table, String location)
+            throws FileSystemIOException {
+        return listInternal(fs, recursive, table, new 
TransactionDirectoryListingCacheKey(transactionId, location));
+    }
+
+    private RemoteIterator<RemoteFile> listInternal(FileSystem fs, boolean 
recursive, TableIf table,
+            TransactionDirectoryListingCacheKey cacheKey) throws 
FileSystemIOException {
+        FetchingValueHolder cachedValueHolder;
+        try {
+            cachedValueHolder = cache.get(cacheKey,
+                    () -> new 
FetchingValueHolder(createListingRemoteIterator(fs, recursive, table, 
cacheKey)));
+        } catch (ExecutionException | UncheckedExecutionException e) {
+            Throwable throwable = e.getCause();
+            Throwables.throwIfInstanceOf(throwable, 
FileSystemIOException.class);
+            Throwables.throwIfUnchecked(throwable);
+            throw new RuntimeException("Failed to list directory: " + 
cacheKey.getPath(), throwable);
+        }
+
+        if (cachedValueHolder.isFullyCached()) {
+            return new 
SimpleRemoteIterator(cachedValueHolder.getCachedFiles());
+        }
+
+        return cachingRemoteIterator(cachedValueHolder, cacheKey);
+    }
+
+    private RemoteIterator<RemoteFile> createListingRemoteIterator(FileSystem 
fs, boolean recursive,
+            TableIf table, TransactionDirectoryListingCacheKey cacheKey)
+            throws FileSystemIOException {
+        return delegate.listFiles(fs, recursive, table, cacheKey.getPath());
+    }
+
+
+    private RemoteIterator<RemoteFile> 
cachingRemoteIterator(FetchingValueHolder cachedValueHolder,
+            TransactionDirectoryListingCacheKey cacheKey) {
+        return new RemoteIterator<RemoteFile>() {
+            private int fileIndex;
+
+            @Override
+            public boolean hasNext()
+                    throws FileSystemIOException {
+                try {
+                    boolean hasNext = 
cachedValueHolder.getCachedFile(fileIndex).isPresent();
+                    // Update cache weight of cachedValueHolder for a given 
path.
+                    // The cachedValueHolder acts as an invalidation guard.
+                    // If a cache invalidation happens while this iterator 
goes over the files from the specified path,
+                    // the eventually outdated file listing will not be added 
anymore to the cache.
+                    cache.asMap().replace(cacheKey, cachedValueHolder, 
cachedValueHolder);
+                    return hasNext;
+                } catch (Exception exception) {
+                    // invalidate cached value to force retry of directory 
listing
+                    cache.invalidate(cacheKey);
+                    throw exception;
+                }
+            }
+
+            @Override
+            public RemoteFile next()
+                    throws FileSystemIOException {
+                // force cache entry weight update in case next file is cached
+                Preconditions.checkState(hasNext());
+                return 
cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(NoSuchElementException::new);
+            }
+        };
+    }
+
+    @VisibleForTesting
+    boolean isCached(String location) {
+        return isCached(new TransactionDirectoryListingCacheKey(transactionId, 
location));
+    }
+
+    @VisibleForTesting
+    boolean isCached(TransactionDirectoryListingCacheKey cacheKey) {
+        FetchingValueHolder cached = cache.getIfPresent(cacheKey);
+        return cached != null && cached.isFullyCached();
+    }
+
+    static class FetchingValueHolder {
+
+        private final List<RemoteFile> cachedFiles = 
ListUtils.synchronizedList(new ArrayList<RemoteFile>());
+
+        @GuardedBy("this")
+        @Nullable
+        private RemoteIterator<RemoteFile> fileIterator;
+        @GuardedBy("this")
+        @Nullable
+        private Exception exception;
+
+        public FetchingValueHolder(RemoteIterator<RemoteFile> fileIterator) {
+            this.fileIterator = Objects.requireNonNull(fileIterator, 
"fileIterator is null");
+        }
+
+        public synchronized boolean isFullyCached() {
+            return fileIterator == null && exception == null;
+        }
+
+        public long getCacheFileCount() {
+            return cachedFiles.size();
+        }
+
+        public Iterator<RemoteFile> getCachedFiles() {
+            Preconditions.checkState(isFullyCached());
+            return cachedFiles.iterator();
+        }
+
+        public Optional<RemoteFile> getCachedFile(int index)
+                throws FileSystemIOException {
+            int filesSize = cachedFiles.size();
+            Preconditions.checkArgument(index >= 0 && index <= filesSize,
+                    "File index (%s) out of bounds [0, %s]", index, filesSize);
+
+            // avoid fileIterator synchronization (and thus blocking) for 
already cached files
+            if (index < filesSize) {
+                return Optional.of(cachedFiles.get(index));
+            }
+
+            return fetchNextCachedFile(index);
+        }
+
+        private synchronized Optional<RemoteFile> fetchNextCachedFile(int 
index)
+                throws FileSystemIOException {
+            if (exception != null) {
+                throw new FileSystemIOException("Exception while listing 
directory", exception);
+            }
+
+            if (index < cachedFiles.size()) {
+                // file was fetched concurrently
+                return Optional.of(cachedFiles.get(index));
+            }
+
+            try {
+                if (fileIterator == null || !fileIterator.hasNext()) {
+                    // no more files
+                    fileIterator = null;
+                    return Optional.empty();
+                }
+
+                RemoteFile fileStatus = fileIterator.next();
+                cachedFiles.add(fileStatus);
+                return Optional.of(fileStatus);
+            } catch (Exception exception) {
+                fileIterator = null;
+                this.exception = exception;
+                throw exception;
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java
new file mode 100644
index 00000000000..c3c9c347c3d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryListerFactory.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.common.EvictableCacheBuilder;
+import 
org.apache.doris.fs.TransactionScopeCachingDirectoryLister.FetchingValueHolder;
+
+import com.google.common.cache.Cache;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TransactionScopeCachingDirectoryListerFactory {
+    //TODO use a cache key based on Path & SchemaTableName and iterate over 
the cache keys
+    // to deal more efficiently with cache invalidation scenarios for 
partitioned tables.
+    // private final Optional<Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder>> cache;
+
+    private final Optional<Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder>> cache;
+
+    private final AtomicLong nextTransactionId = new AtomicLong();
+
+    public TransactionScopeCachingDirectoryListerFactory(long maxSize) {
+        if (maxSize > 0) {
+            EvictableCacheBuilder<TransactionDirectoryListingCacheKey, 
FetchingValueHolder> cacheBuilder =
+                    EvictableCacheBuilder.newBuilder()
+                    .maximumWeight(maxSize)
+                    .weigher((key, value) ->
+                        Math.toIntExact(value.getCacheFileCount()));
+            this.cache = Optional.of(cacheBuilder.build());
+        } else {
+            cache = Optional.empty();
+        }
+    }
+
+    public DirectoryLister get(DirectoryLister delegate) {
+        return cache
+                .map(cache -> (DirectoryLister) new 
TransactionScopeCachingDirectoryLister(delegate,
+                        nextTransactionId.getAndIncrement(), cache))
+                .orElse(delegate);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 92bc509955d..a942c4e75b5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -46,6 +46,7 @@ import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.es.EsExternalTable;
@@ -64,6 +65,9 @@ import 
org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
 import org.apache.doris.datasource.odbc.source.OdbcScanNode;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.paimon.source.PaimonScanNode;
+import org.apache.doris.fs.DirectoryLister;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.properties.DistributionSpec;
 import org.apache.doris.nereids.properties.DistributionSpecAny;
@@ -238,6 +242,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     private final StatsErrorEstimator statsErrorEstimator;
     private final PlanTranslatorContext context;
 
+    private DirectoryLister directoryLister;
+
     public PhysicalPlanTranslator() {
         this(null, null);
     }
@@ -589,12 +595,16 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         // TODO(cmy): determine the needCheckColumnPriv param
         ScanNode scanNode;
         if (table instanceof HMSExternalTable) {
+            if (directoryLister == null) {
+                this.directoryLister = new 
TransactionScopeCachingDirectoryListerFactory(
+                        
Config.max_external_table_split_file_meta_cache_num).get(new 
FileSystemDirectoryLister());
+            }
             switch (((HMSExternalTable) table).getDlaType()) {
                 case ICEBERG:
                     scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
                     break;
                 case HIVE:
-                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
+                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv, directoryLister);
                     HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
                     
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
                     if (fileScan.getTableSample().isPresent()) {
@@ -665,6 +675,10 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
 
     @Override
     public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, 
PlanTranslatorContext context) {
+        if (directoryLister == null) {
+            this.directoryLister = new 
TransactionScopeCachingDirectoryListerFactory(
+                    
Config.max_external_table_split_file_meta_cache_num).get(new 
FileSystemDirectoryLister());
+        }
         List<Slot> slots = fileScan.getOutput();
         ExternalTable table = fileScan.getTable();
         TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, 
context);
@@ -677,7 +691,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         + " for Hudi table");
         PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
         ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false,
-                hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), 
ConnectContext.get().getSessionVariable());
+                hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), 
ConnectContext.get().getSessionVariable(),
+                directoryLister);
         if (fileScan.getTableSnapshot().isPresent()) {
             ((FileQueryScanNode) 
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index d37f6c729f7..b4558347d30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -58,6 +58,7 @@ import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
@@ -72,6 +73,9 @@ import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
 import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
 import org.apache.doris.datasource.odbc.source.OdbcScanNode;
 import org.apache.doris.datasource.paimon.source.PaimonScanNode;
+import org.apache.doris.fs.DirectoryLister;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
@@ -113,6 +117,8 @@ public class SingleNodePlanner {
     private final ArrayList<ScanNode> scanNodes = Lists.newArrayList();
     private Map<Analyzer, List<ScanNode>> selectStmtToScanNodes = 
Maps.newHashMap();
 
+    private DirectoryLister directoryLister;
+
     public SingleNodePlanner(PlannerContext ctx) {
         this.ctx = ctx;
     }
@@ -1960,6 +1966,10 @@ public class SingleNodePlanner {
                 scanNode = ((TableValuedFunctionRef) 
tblRef).getScanNode(ctx.getNextNodeId(), sv);
                 break;
             case HMS_EXTERNAL_TABLE:
+                if (directoryLister == null) {
+                    this.directoryLister = new 
TransactionScopeCachingDirectoryListerFactory(
+                            
Config.max_external_table_split_file_meta_cache_num).get(new 
FileSystemDirectoryLister());
+                }
                 TableIf table = tblRef.getDesc().getTable();
                 switch (((HMSExternalTable) table).getDlaType()) {
                     case HUDI:
@@ -1970,13 +1980,13 @@ public class SingleNodePlanner {
                                     + "please set enable_nereids_planner = 
true to enable new optimizer");
                         }
                         scanNode = new HudiScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true,
-                                Optional.empty(), Optional.empty(), sv);
+                                Optional.empty(), Optional.empty(), sv, 
directoryLister);
                         break;
                     case ICEBERG:
                         scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                         break;
                     case HIVE:
-                        scanNode = new HiveScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
+                        scanNode = new HiveScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv, directoryLister);
                         ((HiveScanNode) 
scanNode).setTableSample(tblRef.getTableSample());
                         break;
                     default:
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java
new file mode 100644
index 00000000000..3bfdc73b78e
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java
@@ -0,0 +1,708 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import 
org.apache.doris.common.EvictableCacheBuilder.DisabledCacheImplementation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.gaul.modernizer_maven_annotations.SuppressModernizer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+public class TestEvictableCache {
+    private static class TestingTicker extends Ticker {
+        private volatile long time;
+
+        public TestingTicker() {
+        }
+
+        public long read() {
+            return this.time;
+        }
+
+        public synchronized void increment(long delta, TimeUnit unit) {
+            Preconditions.checkArgument(delta >= 0L, "delta is negative");
+            this.time += unit.toNanos(delta);
+        }
+    }
+
+    private static final int TEST_TIMEOUT_SECONDS = 10;
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testLoad()
+            throws Exception {
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10_000)
+                .build();
+        Assert.assertEquals("abc", cache.get(42, () -> "abc"));
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testEvictBySize()
+            throws Exception {
+        int maximumSize = 10;
+        Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(maximumSize)
+                .build();
+
+        for (int i = 0; i < 10_000; i++) {
+            int value = i * 10;
+            Assert.assertEquals(value, (Object) cache.get(i, () -> value));
+        }
+        cache.cleanUp();
+        Assert.assertEquals(maximumSize, cache.size());
+        Assert.assertEquals(maximumSize, ((EvictableCache<?, ?>) 
cache).tokensCount());
+
+        // Ensure cache is effective, i.e. some entries preserved
+        int lastKey = 10_000 - 1;
+        Assert.assertEquals(lastKey * 10, (Object) cache.get(lastKey, () -> {
+            throw new UnsupportedOperationException();
+        }));
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testEvictByWeight() throws Exception {
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .maximumWeight(20)
+                .weigher((Integer key, String value) -> value.length())
+                .build();
+
+        for (int i = 0; i < 10; i++) {
+            String value = String.join("", Collections.nCopies(i, "a"));
+            Assert.assertEquals(value, cache.get(i, () -> value));
+        }
+
+        cache.cleanUp();
+        // It's not deterministic which entries get evicted
+        int cacheSize = Math.toIntExact(cache.size());
+
+        Assert.assertEquals(cacheSize, ((EvictableCache<?, ?>) 
cache).tokensCount());
+        Assert.assertEquals(cacheSize, cache.asMap().keySet().size());
+
+        int keySum = cache.asMap().keySet().stream()
+                .mapToInt(i -> i)
+                .sum();
+        Assert.assertTrue("key sum should be <= 20", keySum <= 20);
+
+        Assert.assertEquals(cacheSize, cache.asMap().values().size());
+
+        int valuesLengthSum = cache.asMap().values().stream()
+                .mapToInt(String::length)
+                .sum();
+        Assert.assertTrue("values length sum should be <= 20", valuesLengthSum 
<= 20);
+
+        // Ensure cache is effective, i.e. some entries preserved
+        int lastKey = 9; // 10 - 1
+        String expected = String.join("", Collections.nCopies(lastKey, "a")); 
// java8 替代 repeat
+        Assert.assertEquals(expected, cache.get(lastKey, () -> {
+            throw new UnsupportedOperationException();
+        }));
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testEvictByTime() throws Exception {
+        TestingTicker ticker = new TestingTicker();
+        int ttl = 100;
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .ticker(ticker)
+                .expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
+                .build();
+
+        Assert.assertEquals("1 ala ma kota", cache.get(1, () -> "1 ala ma 
kota"));
+        ticker.increment(ttl, TimeUnit.MILLISECONDS);
+        Assert.assertEquals("2 ala ma kota", cache.get(2, () -> "2 ala ma 
kota"));
+        cache.cleanUp();
+
+        // First entry should be expired and its token removed
+        int cacheSize = Math.toIntExact(cache.size());
+        Assert.assertEquals(1, cacheSize);
+        Assert.assertEquals(cacheSize, ((EvictableCache<?, ?>) 
cache).tokensCount());
+        Assert.assertEquals(cacheSize, cache.asMap().keySet().size());
+        Assert.assertEquals(cacheSize, cache.asMap().values().size());
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testPreserveValueLoadedAfterTimeExpiration() throws Exception {
+        TestingTicker ticker = new TestingTicker();
+        int ttl = 100;
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .ticker(ticker)
+                .expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
+                .build();
+        int key = 11;
+
+        Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "11 ala ma 
kota"));
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+
+        Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "something 
else"));
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+
+        ticker.increment(ttl, TimeUnit.MILLISECONDS);
+        Assert.assertEquals("new value", cache.get(key, () -> "new value"));
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+
+        Assert.assertEquals("new value", cache.get(key, () -> "something yet 
different"));
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+
+        Assert.assertEquals(1, cache.size());
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+        Assert.assertEquals(1, cache.asMap().keySet().size());
+        Assert.assertEquals(1, cache.asMap().values().size());
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testReplace() throws Exception {
+        Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10)
+                .build();
+
+        int key = 10;
+        int initialValue = 20;
+        int replacedValue = 21;
+
+        cache.get(key, () -> initialValue);
+
+        Assert.assertTrue("Should successfully replace value", 
cache.asMap().replace(key, initialValue, replacedValue));
+        Assert.assertEquals("Cache should contain replaced value", 
replacedValue, cache.getIfPresent(key).intValue());
+
+        Assert.assertFalse("Should not replace when current value is 
different", cache.asMap().replace(key, initialValue, replacedValue));
+        Assert.assertEquals("Cache should maintain replaced value", 
replacedValue, cache.getIfPresent(key).intValue());
+
+        Assert.assertFalse("Should not replace non-existent key", 
cache.asMap().replace(100000, replacedValue, 22));
+        Assert.assertEquals("Cache should only contain original key", 
ImmutableSet.of(key), cache.asMap().keySet());
+        Assert.assertEquals("Original key should maintain its value", 
replacedValue, cache.getIfPresent(key).intValue());
+
+        int anotherKey = 13;
+        int anotherInitialValue = 14;
+        cache.get(anotherKey, () -> anotherInitialValue);
+        cache.invalidate(anotherKey);
+
+        Assert.assertFalse("Should not replace after invalidation", 
cache.asMap().replace(anotherKey, anotherInitialValue, 15));
+        Assert.assertEquals("Cache should only contain original key after 
invalidation", ImmutableSet.of(key), cache.asMap().keySet());
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testDisabledCache() throws Exception {
+        Exception exception = Assert.assertThrows(IllegalStateException.class, 
() ->
+                EvictableCacheBuilder.newBuilder()
+                        .maximumSize(0)
+                        .build());
+
+        Assert.assertEquals("Even when cache is disabled, the loads are 
synchronized and both load results and failures are shared between threads. "
+                        + "This is rarely desired, thus builder caller is 
expected to either opt-in into this behavior with 
shareResultsAndFailuresEvenIfDisabled(), "
+                        + "or choose not to share results (and failures) 
between concurrent invocations with shareNothingWhenDisabled().",
+                exception.getMessage());
+
+        testDisabledCache(
+                EvictableCacheBuilder.newBuilder()
+                        .maximumSize(0)
+                        .shareNothingWhenDisabled()
+                        .build());
+
+        testDisabledCache(
+                EvictableCacheBuilder.newBuilder()
+                        .maximumSize(0)
+                        .shareResultsAndFailuresEvenIfDisabled()
+                        .build());
+    }
+
+    private void testDisabledCache(Cache<Integer, Integer> cache) throws 
Exception {
+        for (int i = 0; i < 10; i++) {
+            int value = i * 10;
+            Assert.assertEquals(value, cache.get(i, () -> value).intValue());
+        }
+
+        cache.cleanUp();
+        Assert.assertEquals(0, cache.size());
+        Assert.assertTrue(cache.asMap().keySet().isEmpty());
+        Assert.assertTrue(cache.asMap().values().isEmpty());
+    }
+
+    private static class CacheStatsAssertions {
+        public static CacheStatsAssertions assertCacheStats(Cache<?, ?> cache) 
{
+            Objects.requireNonNull(cache, "cache is null");
+            return assertCacheStats(cache::stats);
+        }
+
+        public static CacheStatsAssertions 
assertCacheStats(Supplier<CacheStats> statsSupplier) {
+            return new CacheStatsAssertions(statsSupplier);
+        }
+
+        private final Supplier<CacheStats> stats;
+
+        private long loads;
+        private long hits;
+        private long misses;
+
+        private CacheStatsAssertions(Supplier<CacheStats> stats) {
+            this.stats = Objects.requireNonNull(stats, "stats is null");
+        }
+
+        public CacheStatsAssertions loads(long value) {
+            this.loads = value;
+            return this;
+        }
+
+        public CacheStatsAssertions hits(long value) {
+            this.hits = value;
+            return this;
+        }
+
+        public CacheStatsAssertions misses(long value) {
+            this.misses = value;
+            return this;
+        }
+
+        public void afterRunning(Runnable runnable) {
+            try {
+                calling(() -> {
+                    runnable.run();
+                    return null;
+                });
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public <T> T calling(Callable<T> callable)
+                throws Exception {
+            CacheStats beforeStats = stats.get();
+            T value = callable.call();
+            CacheStats afterStats = stats.get();
+
+            long loadDelta = afterStats.loadCount() - beforeStats.loadCount();
+            long missesDelta = afterStats.missCount() - 
beforeStats.missCount();
+            long hitsDelta = afterStats.hitCount() - beforeStats.hitCount();
+
+            Assert.assertEquals(loads, loadDelta);
+            Assert.assertEquals(hits, hitsDelta);
+            Assert.assertEquals(misses, missesDelta);
+
+            return value;
+        }
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testLoadStats()
+            throws Exception {
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10_000)
+                .recordStats()
+                .build();
+
+        Assert.assertEquals(new CacheStats(0, 0, 0, 0, 0, 0), cache.stats());
+
+        String value = CacheStatsAssertions.assertCacheStats(cache)
+                .misses(1)
+                .loads(1)
+                .calling(() -> cache.get(42, () -> "abc"));
+        Assert.assertEquals("abc", value);
+
+        value = CacheStatsAssertions.assertCacheStats(cache)
+                .hits(1)
+                .calling(() -> cache.get(42, () -> "xyz"));
+        Assert.assertEquals("abc", value);
+
+        // with equal, but not the same key
+        value = CacheStatsAssertions.assertCacheStats(cache)
+                .hits(1)
+                .calling(() -> cache.get(newInteger(42), () -> "xyz"));
+        Assert.assertEquals("abc", value);
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testLoadFailure()
+            throws Exception {
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(0)
+                .expireAfterWrite(0, TimeUnit.DAYS)
+                .shareResultsAndFailuresEvenIfDisabled()
+                .build();
+        int key = 10;
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try {
+            Exchanger<Thread> exchanger = new Exchanger<>();
+            CountDownLatch secondUnblocked = new CountDownLatch(1);
+
+            List<Future<String>> futures = new ArrayList<>();
+            for (int i = 0; i < 2; i++) {
+                boolean first = i == 0;
+                futures.add(executor.submit(() -> {
+                    if (!first) {
+                        // Wait for the first one to start the call
+                        exchanger.exchange(Thread.currentThread(), 10, 
TimeUnit.SECONDS);
+                        // Prove that we are back in RUNNABLE state.
+                        secondUnblocked.countDown();
+                    }
+                    return cache.get(key, () -> {
+                        if (first) {
+                            Thread secondThread = exchanger.exchange(null, 10, 
TimeUnit.SECONDS);
+                            Assert.assertTrue(secondUnblocked.await(10, 
TimeUnit.SECONDS));
+                            // Wait for the second one to hang inside the 
cache.get call.
+                            long start = System.nanoTime();
+                            while (!Thread.currentThread().isInterrupted()) {
+                                try {
+                                    
Assert.assertNotEquals(Thread.State.RUNNABLE, secondThread.getState());
+                                    break;
+                                } catch (Exception | AssertionError e) {
+                                    if (System.nanoTime() - start > 
TimeUnit.SECONDS.toNanos(30)) {
+                                        throw e;
+                                    }
+                                }
+                                try {
+                                    Thread.sleep(50);
+                                } catch (InterruptedException e) {
+                                    Thread.currentThread().interrupt();
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                            throw new RuntimeException("first attempt is 
poised to fail");
+                        }
+                        return "success";
+                    });
+                }));
+            }
+
+            List<String> results = new ArrayList<>();
+            for (Future<String> future : futures) {
+                try {
+                    results.add(future.get());
+                } catch (ExecutionException e) {
+                    results.add(e.getCause().toString());
+                }
+            }
+
+            // Note: if this starts to fail, that suggests that Guava 
implementation changed and NoopCache may be redundant now.
+            String expectedError = 
"com.google.common.util.concurrent.UncheckedExecutionException: "
+                    + "java.lang.RuntimeException: first attempt is poised to 
fail";
+            Assert.assertEquals(2, results.size());
+            Assert.assertEquals(expectedError, results.get(0));
+            Assert.assertEquals(expectedError, results.get(1));
+        } finally {
+            executor.shutdownNow();
+            Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+        }
+    }
+
+    @SuppressModernizer
+    private static Integer newInteger(int value) {
+        Integer integer = value;
+        @SuppressWarnings({"UnnecessaryBoxing", "BoxedPrimitiveConstructor", 
"CachedNumberConstructorCall", "removal"})
+        Integer newInteger = new Integer(value);
+        Assert.assertNotSame(integer, newInteger);
+        return newInteger;
+    }
+
+    /**
+     * Test that the loader is invoked only once for concurrent invocations of 
{{@link LoadingCache#get(Object, Callable)} with equal keys.
+     * This is a behavior of Guava Cache as well. While this is necessarily 
desirable behavior (see
+     * <a 
href="https://github.com/trinodb/trino/issues/11067";>https://github.com/trinodb/trino/issues/11067</a>),
+     * the test exists primarily to document current state and support 
discussion, should the current state change.
+     */
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testConcurrentGetWithCallableShareLoad()
+            throws Exception {
+        AtomicInteger loads = new AtomicInteger();
+        AtomicInteger concurrentInvocations = new AtomicInteger();
+
+        Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10_000)
+                .build();
+
+        int threads = 2;
+        int invocationsPerThread = 100;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        try {
+            CyclicBarrier barrier = new CyclicBarrier(threads);
+            List<Future<?>> futures = new ArrayList<>();
+            for (int i = 0; i < threads; i++) {
+                futures.add(executor.submit(() -> {
+                    for (int invocation = 0; invocation < 
invocationsPerThread; invocation++) {
+                        int key = invocation;
+                        barrier.await(10, TimeUnit.SECONDS);
+                        int value = cache.get(key, () -> {
+                            loads.incrementAndGet();
+                            int invocations = 
concurrentInvocations.incrementAndGet();
+                            Preconditions.checkState(invocations == 1, "There 
should be no concurrent invocations, cache should do load sharing when get() 
invoked for same key");
+                            Thread.sleep(1);
+                            concurrentInvocations.decrementAndGet();
+                            return -key;
+                        });
+                        Assert.assertEquals(-invocation, value);
+                    }
+                    return null;
+                }));
+            }
+
+            for (Future<?> future : futures) {
+                future.get(10, TimeUnit.SECONDS);
+            }
+            Assert.assertTrue(
+                    String.format(
+                            "loads (%d) should be between %d and %d",
+                            loads.intValue(),
+                            invocationsPerThread,
+                            threads * invocationsPerThread - 1),
+                    loads.intValue() >= invocationsPerThread && 
loads.intValue() <= threads * invocationsPerThread - 1);
+        } finally {
+            executor.shutdownNow();
+            Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+        }
+    }
+
+    enum Invalidation {
+        INVALIDATE_KEY,
+        INVALIDATE_PREDEFINED_KEYS,
+        INVALIDATE_SELECTED_KEYS,
+        INVALIDATE_ALL,
+        /**/;
+    }
+
+    /**
+     * Covers https://github.com/google/guava/issues/1881
+     */
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testInvalidateOngoingLoad()
+            throws Exception {
+        for (Invalidation invalidation : Invalidation.values()) {
+            Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                    .maximumSize(10_000)
+                    .build();
+            Integer key = 42;
+
+            CountDownLatch loadOngoing = new CountDownLatch(1);
+            CountDownLatch invalidated = new CountDownLatch(1);
+            CountDownLatch getReturned = new CountDownLatch(1);
+            ExecutorService executor = Executors.newFixedThreadPool(2);
+            try {
+                // thread A
+                Future<String> threadA = executor.submit(() -> {
+                    String value = cache.get(key, () -> {
+                        loadOngoing.countDown(); // 1
+                        Assert.assertTrue(invalidated.await(10, 
TimeUnit.SECONDS)); // 2
+                        return "stale value";
+                    });
+                    getReturned.countDown(); // 3
+                    return value;
+                });
+
+                // thread B
+                Future<String> threadB = executor.submit(() -> {
+                    Assert.assertTrue(loadOngoing.await(10, 
TimeUnit.SECONDS)); // 1
+
+                    switch (invalidation) {
+                        case INVALIDATE_KEY:
+                            cache.invalidate(key);
+                            break;
+                        case INVALIDATE_PREDEFINED_KEYS:
+                            cache.invalidateAll(ImmutableList.of(key));
+                            break;
+                        case INVALIDATE_SELECTED_KEYS:
+                            Set<Integer> keys = cache.asMap().keySet().stream()
+                                    .filter(foundKey -> (int) foundKey == key)
+                                    .collect(ImmutableSet.toImmutableSet());
+                            cache.invalidateAll(keys);
+                            break;
+                        case INVALIDATE_ALL:
+                            cache.invalidateAll();
+                            break;
+                        default:
+                            throw new IllegalArgumentException();
+                    }
+
+                    invalidated.countDown(); // 2
+                    // Cache may persist value after loader returned, but 
before `cache.get(...)` returned. Ensure the latter completed.
+                    Assert.assertTrue(getReturned.await(10, 
TimeUnit.SECONDS)); // 3
+
+                    return cache.get(key, () -> "fresh value");
+                });
+
+                Assert.assertEquals("stale value", threadA.get());
+                Assert.assertEquals("fresh value", threadB.get());
+            } finally {
+                executor.shutdownNow();
+                Assert.assertTrue(executor.awaitTermination(10, 
TimeUnit.SECONDS));
+            }
+        }
+    }
+
+    /**
+     * Covers https://github.com/google/guava/issues/1881
+     */
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testInvalidateAndLoadConcurrently()
+            throws Exception {
+        for (Invalidation invalidation : Invalidation.values()) {
+            int[] primes = {2, 3, 5, 7};
+            AtomicLong remoteState = new AtomicLong(1);
+
+            Cache<Integer, Long> cache = EvictableCacheBuilder.newBuilder()
+                    .maximumSize(10_000)
+                    .build();
+            Integer key = 42;
+            int threads = 4;
+
+            CyclicBarrier barrier = new CyclicBarrier(threads);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
+            try {
+                List<Future<Void>> futures = IntStream.range(0, threads)
+                        .mapToObj(threadNumber -> executor.submit(() -> {
+                            // prime the cache
+                            Assert.assertEquals(1L, (long) cache.get(key, 
remoteState::get));
+                            int prime = primes[threadNumber];
+
+                            barrier.await(10, TimeUnit.SECONDS);
+
+                            // modify underlying state
+                            remoteState.updateAndGet(current -> current * 
prime);
+
+                            // invalidate
+                            switch (invalidation) {
+                                case INVALIDATE_KEY:
+                                    cache.invalidate(key);
+                                    break;
+                                case INVALIDATE_PREDEFINED_KEYS:
+                                    cache.invalidateAll(ImmutableList.of(key));
+                                    break;
+                                case INVALIDATE_SELECTED_KEYS:
+                                    Set<Integer> keys = 
cache.asMap().keySet().stream()
+                                            .filter(foundKey -> (int) foundKey 
== key)
+                                            
.collect(ImmutableSet.toImmutableSet());
+                                    cache.invalidateAll(keys);
+                                    break;
+                                case INVALIDATE_ALL:
+                                    cache.invalidateAll();
+                                    break;
+                                default:
+                                    throw new IllegalArgumentException();
+                            }
+
+                            // read through cache
+                            long current = cache.get(key, remoteState::get);
+                            if (current % prime != 0) {
+                                throw new AssertionError(String.format("The 
value read through cache (%s) in thread (%s) is not divisible by (%s)", 
current, threadNumber, prime));
+                            }
+
+                            return (Void) null;
+                        }))
+                        .collect(ImmutableList.toImmutableList());
+
+                for (Future<?> future : futures) {
+                    try {
+                        future.get(10, TimeUnit.SECONDS);
+                    } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                        throw new RuntimeException("Failed to get future 
value", e);
+                    }
+                }
+
+                Assert.assertEquals(2 * 3 * 5 * 7, remoteState.get());
+                Assert.assertEquals(remoteState.get(), (long) cache.get(key, 
remoteState::get));
+            } finally {
+                executor.shutdownNow();
+                Assert.assertTrue(executor.awaitTermination(10, 
TimeUnit.SECONDS));
+            }
+        }
+    }
+
+    @Test
+    public void testPutOnEmptyCacheImplementation() {
+        for (DisabledCacheImplementation disabledCacheImplementation : 
DisabledCacheImplementation.values()) {
+            Cache<Object, Object> cache = EvictableCacheBuilder.newBuilder()
+                    .maximumSize(0)
+                    .disabledCacheImplementation(disabledCacheImplementation)
+                    .build();
+            Map<Object, Object> cacheMap = cache.asMap();
+
+            int key = 0;
+            int value = 1;
+            Assert.assertNull(cacheMap.put(key, value));
+            Assert.assertNull(cacheMap.put(key, value));
+            Assert.assertNull(cacheMap.putIfAbsent(key, value));
+            Assert.assertNull(cacheMap.putIfAbsent(key, value));
+        }
+    }
+
+    @Test
+    public void testPutOnNonEmptyCacheImplementation() {
+        Cache<Object, Object> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10)
+                .build();
+        Map<Object, Object> cacheMap = cache.asMap();
+
+        int key = 0;
+        int value = 1;
+
+        Exception putException = Assert.assertThrows("put operation should 
throw UnsupportedOperationException",
+                UnsupportedOperationException.class,
+                () -> cacheMap.put(key, value));
+        Assert.assertEquals(
+                "The operation is not supported, as in inherently races with 
cache invalidation. Use get(key, callable) instead.",
+                putException.getMessage());
+
+        Exception putIfAbsentException = Assert.assertThrows("putIfAbsent 
operation should throw UnsupportedOperationException",
+                UnsupportedOperationException.class,
+                () -> cacheMap.putIfAbsent(key, value));
+        Assert.assertEquals(
+                "The operation is not supported, as in inherently races with 
cache invalidation",
+                putIfAbsentException.getMessage());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java
new file mode 100644
index 00000000000..d6c6ed4b93b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+// some tests may invalidate the whole cache affecting therefore other 
concurrent tests
+@Execution(ExecutionMode.SAME_THREAD)
+public class TransactionScopeCachingDirectoryListerTest {
+    @Test
+    public void testConcurrentDirectoryListing(@Mocked TableIf table)
+            throws FileSystemIOException {
+        RemoteFile firstFile = new RemoteFile("file:/x/x", true, 1, 1);
+        RemoteFile secondFile = new RemoteFile("file:/x/y", true, 1, 1);
+        RemoteFile thirdFile = new RemoteFile("file:/y/z", true, 1, 1);
+
+        String path1 = "file:/x";
+        String path2 = "file:/y";
+
+        CountingDirectoryLister countingLister = new CountingDirectoryLister(
+                ImmutableMap.of(
+                        path1, ImmutableList.of(firstFile, secondFile),
+                        path2, ImmutableList.of(thirdFile)));
+
+        TransactionScopeCachingDirectoryLister cachingLister = 
(TransactionScopeCachingDirectoryLister)
+                new 
TransactionScopeCachingDirectoryListerFactory(2).get(countingLister);
+
+        assertFiles(cachingLister.listFiles(null, true, table, path2), 
ImmutableList.of(thirdFile));
+
+        Assert.assertEquals(1, countingLister.getListCount());
+
+        // listing path2 again shouldn't increase listing count
+        Assert.assertTrue(cachingLister.isCached(path2));
+        assertFiles(cachingLister.listFiles(null, true, table, path2), 
ImmutableList.of(thirdFile));
+        Assert.assertEquals(1, countingLister.getListCount());
+
+
+        // start listing path1 concurrently
+        RemoteIterator<RemoteFile> path1FilesA = cachingLister.listFiles(null, 
true, table, path1);
+        RemoteIterator<RemoteFile> path1FilesB = cachingLister.listFiles(null, 
true, table, path1);
+        Assert.assertEquals(2, countingLister.getListCount());
+
+        // list path1 files using both iterators concurrently
+        Assert.assertEquals(firstFile, path1FilesA.next());
+        Assert.assertEquals(firstFile, path1FilesB.next());
+        Assert.assertEquals(secondFile, path1FilesB.next());
+        Assert.assertEquals(secondFile, path1FilesA.next());
+        Assert.assertFalse(path1FilesA.hasNext());
+        Assert.assertFalse(path1FilesB.hasNext());
+        Assert.assertEquals(2, countingLister.getListCount());
+
+        Assert.assertFalse(cachingLister.isCached(path2));
+        assertFiles(cachingLister.listFiles(null, true, table, path2), 
ImmutableList.of(thirdFile));
+        Assert.assertEquals(3, countingLister.getListCount());
+    }
+
+    @Test
+    public void testConcurrentDirectoryListingException(@Mocked TableIf table)
+            throws FileSystemIOException {
+        RemoteFile file = new RemoteFile("file:/x/x", true, 1, 1);
+
+        String path = "file:/x";
+
+        CountingDirectoryLister countingLister = new 
CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file)));
+        DirectoryLister cachingLister = new 
TransactionScopeCachingDirectoryListerFactory(1).get(countingLister);
+
+        // start listing path concurrently
+        countingLister.setThrowException(true);
+        RemoteIterator<RemoteFile> filesA = cachingLister.listFiles(null, 
true, table, path);
+        RemoteIterator<RemoteFile> filesB = cachingLister.listFiles(null, 
true, table, path);
+        Assert.assertEquals(1, countingLister.getListCount());
+
+        // listing should throw an exception
+        Assert.assertThrows(FileSystemIOException.class, () -> 
filesA.hasNext());
+
+
+        // listing again should succeed
+        countingLister.setThrowException(false);
+        assertFiles(cachingLister.listFiles(null, true, table, path), 
ImmutableList.of(file));
+        Assert.assertEquals(2, countingLister.getListCount());
+
+        // listing using second concurrently initialized DirectoryLister 
should fail
+        Assert.assertThrows(FileSystemIOException.class, () -> 
filesB.hasNext());
+
+    }
+
+    private void assertFiles(RemoteIterator<RemoteFile> iterator, 
List<RemoteFile> expectedFiles)
+            throws FileSystemIOException {
+        ImmutableList.Builder<RemoteFile> actualFiles = 
ImmutableList.builder();
+        while (iterator.hasNext()) {
+            actualFiles.add(iterator.next());
+        }
+        Assert.assertEquals(expectedFiles, actualFiles.build());
+    }
+
+    private static class CountingDirectoryLister
+            implements DirectoryLister {
+        private final Map<String, List<RemoteFile>> fileStatuses;
+        private int listCount;
+        private boolean throwException;
+
+        public CountingDirectoryLister(Map<String, List<RemoteFile>> 
fileStatuses) {
+            this.fileStatuses = Objects.requireNonNull(fileStatuses, 
"fileStatuses is null");
+        }
+
+        @Override
+        public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean 
recursive, TableIf table, String location)
+                throws FileSystemIOException {
+            // No specific recursive files-only listing implementation
+            listCount++;
+            return 
throwingRemoteIterator(Objects.requireNonNull(fileStatuses.get(location)), 
throwException);
+        }
+
+        public void setThrowException(boolean throwException) {
+            this.throwException = throwException;
+        }
+
+        public int getListCount() {
+            return listCount;
+        }
+    }
+
+    static RemoteIterator<RemoteFile> throwingRemoteIterator(List<RemoteFile> 
files, boolean throwException) {
+        return new RemoteIterator<RemoteFile>() {
+            private final Iterator<RemoteFile> iterator = 
ImmutableList.copyOf(files).iterator();
+
+            @Override
+            public boolean hasNext()
+                    throws FileSystemIOException {
+                if (throwException) {
+                    throw new FileSystemIOException("File system io 
exception.");
+                }
+                return iterator.hasNext();
+            }
+
+            @Override
+            public RemoteFile next() {
+                return iterator.next();
+            }
+        };
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to