This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ff07a391a5 [Opt](multi-catalog)Improve performance by introducing 
cache of list directory files when getting split for each query. (#43913)
1ff07a391a5 is described below

commit 1ff07a391a540882b40f7a3f5ab693dbee69ffbf
Author: Qi Chen <che...@selectdb.com>
AuthorDate: Wed Feb 12 22:31:45 2025 +0800

    [Opt](multi-catalog)Improve performance by introducing cache of list 
directory files when getting split for each query. (#43913)
    
    ### What problem does this PR solve?
    
    Refer to trino to implement the cache mechanism of multiple hive tables
    at the query level to obtain the file split list of each partition.
    Because files within a query should have the same visibility, the split
    list of partitions that see the same table should be consistent across
    the query scope. So this cache is reasonable and should be enabled by
    default.
    The mechanism in Trino is transactional level. A transaction can see the
    same table, so the command is `TransactionScopeCachingDirectoryLister`.
    This name is retained for Doris to expand to the transaction concept in
    the future.
    In addition, for this scenario, because the caffeine cache currently
    used by doris has an elimination phase strategy, the existing cache
    items in the window area may be eliminated immediately after the weight
    is updated. Therefore, `EvictableCache` which based on guava was
    introduced and eliminated based on segment LRU.
---
 .../main/java/org/apache/doris/common/Config.java  |   4 +
 .../java/org/apache/doris/common/EmptyCache.java   | 247 +++++++
 .../org/apache/doris/common/EvictableCache.java    | 466 ++++++++++++++
 .../apache/doris/common/EvictableCacheBuilder.java | 286 +++++++++
 .../doris/datasource/hive/HMSExternalTable.java    |   4 +-
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  69 +-
 .../doris/datasource/hive/source/HiveScanNode.java |  15 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |   5 +-
 .../java/org/apache/doris/fs/DirectoryLister.java  |  29 +
 .../apache/doris/fs/FileSystemDirectoryLister.java |  37 ++
 .../org/apache/doris/fs/FileSystemIOException.java |  65 ++
 .../apache/doris/fs/RemoteFileRemoteIterator.java  |  47 ++
 .../java/org/apache/doris/fs/RemoteIterator.java   |  27 +
 .../org/apache/doris/fs/SimpleRemoteIterator.java  |  45 ++
 .../fs/TransactionDirectoryListingCacheKey.java    |  64 ++
 .../fs/TransactionScopeCachingDirectoryLister.java | 216 +++++++
 ...nsactionScopeCachingDirectoryListerFactory.java |  59 ++
 .../glue/translator/PhysicalPlanTranslator.java    |  19 +-
 .../apache/doris/planner/SingleNodePlanner.java    |  14 +-
 .../apache/doris/common/TestEvictableCache.java    | 708 +++++++++++++++++++++
 ...TransactionScopeCachingDirectoryListerTest.java | 174 +++++
 21 files changed, 2563 insertions(+), 37 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 030833ef4df..60f59460a0a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2214,6 +2214,10 @@ public class Config extends ConfigBase {
         "Max cache number of external table row count"})
     public static long max_external_table_row_count_cache_num = 100000;
 
+    @ConfField(description = {"每个查询的外表文件元数据缓存的最大文件数量。",
+            "Max cache file number of external table split file meta cache at 
query level."})
+    public static long max_external_table_split_file_meta_cache_num = 100000;
+
     /**
      * Max cache loader thread-pool size.
      * Max thread pool size for loading external meta cache
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java
new file mode 100644
index 00000000000..5942eb2b1f3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EmptyCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import com.google.common.cache.AbstractLoadingCache;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
+import com.google.common.cache.CacheStats;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+
+class EmptyCache<K, V>
+        extends AbstractLoadingCache<K, V> {
+    private final CacheLoader<? super K, V> loader;
+    private final StatsCounter statsCounter;
+
+    EmptyCache(CacheLoader<? super K, V> loader, boolean recordStats) {
+        this.loader = Objects.requireNonNull(loader, "loader is null");
+        this.statsCounter = recordStats ? new SimpleStatsCounter() : new 
NoopStatsCounter();
+    }
+
+    @Override
+    public V getIfPresent(Object key) {
+        statsCounter.recordMisses(1);
+        return null;
+    }
+
+    @Override
+    public V get(K key)
+            throws ExecutionException {
+        return get(key, () -> loader.load(key));
+    }
+
+    @Override
+    public ImmutableMap<K, V> getAll(Iterable<? extends K> keys)
+            throws ExecutionException {
+        try {
+            Set<K> keySet = ImmutableSet.copyOf(keys);
+            statsCounter.recordMisses(keySet.size());
+            @SuppressWarnings("unchecked") // safe since all keys extend K
+            ImmutableMap<K, V> result = (ImmutableMap<K, V>) 
loader.loadAll(keySet);
+            for (K key : keySet) {
+                if (!result.containsKey(key)) {
+                    throw new InvalidCacheLoadException("loadAll failed to 
return a value for " + key);
+                }
+            }
+            statsCounter.recordLoadSuccess(1);
+            return result;
+        } catch (RuntimeException e) {
+            statsCounter.recordLoadException(1);
+            throw new UncheckedExecutionException(e);
+        } catch (Exception e) {
+            statsCounter.recordLoadException(1);
+            throw new ExecutionException(e);
+        }
+    }
+
+    @Override
+    public V get(K key, Callable<? extends V> valueLoader)
+            throws ExecutionException {
+        statsCounter.recordMisses(1);
+        try {
+            V value = valueLoader.call();
+            statsCounter.recordLoadSuccess(1);
+            return value;
+        } catch (RuntimeException e) {
+            statsCounter.recordLoadException(1);
+            throw new UncheckedExecutionException(e);
+        } catch (Exception e) {
+            statsCounter.recordLoadException(1);
+            throw new ExecutionException(e);
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        // Cache, even if configured to evict everything immediately, should 
allow writes.
+    }
+
+    @Override
+    public void refresh(K key) {}
+
+    @Override
+    public void invalidate(Object key) {}
+
+    @Override
+    public void invalidateAll(Iterable<?> keys) {}
+
+    @Override
+    public void invalidateAll() {
+
+    }
+
+    @Override
+    public long size() {
+        return 0;
+    }
+
+    @Override
+    public CacheStats stats() {
+        return statsCounter.snapshot();
+    }
+
+    @Override
+    public ConcurrentMap<K, V> asMap() {
+        return new ConcurrentMap<K, V>() {
+            @Override
+            public V putIfAbsent(K key, V value) {
+                // Cache, even if configured to evict everything immediately, 
should allow writes.
+                // putIfAbsent returns the previous value
+                return null;
+            }
+
+            @Override
+            public boolean remove(Object key, Object value) {
+                return false;
+            }
+
+            @Override
+            public boolean replace(K key, V oldValue, V newValue) {
+                return false;
+            }
+
+            @Override
+            public V replace(K key, V value) {
+                return null;
+            }
+
+            @Override
+            public int size() {
+                return 0;
+            }
+
+            @Override
+            public boolean isEmpty() {
+                return true;
+            }
+
+            @Override
+            public boolean containsKey(Object key) {
+                return false;
+            }
+
+            @Override
+            public boolean containsValue(Object value) {
+                return false;
+            }
+
+            @Override
+            @Nullable
+            public V get(Object key) {
+                return null;
+            }
+
+            @Override
+            @Nullable
+            public V put(K key, V value) {
+                // Cache, even if configured to evict everything immediately, 
should allow writes.
+                return null;
+            }
+
+            @Override
+            @Nullable
+            public V remove(Object key) {
+                return null;
+            }
+
+            @Override
+            public void putAll(Map<? extends K, ? extends V> m) {
+                // Cache, even if configured to evict everything immediately, 
should allow writes.
+            }
+
+            @Override
+            public void clear() {
+
+            }
+
+            @Override
+            public Set<K> keySet() {
+                return ImmutableSet.of();
+            }
+
+            @Override
+            public Collection<V> values() {
+                return ImmutableSet.of();
+            }
+
+            @Override
+            public Set<Entry<K, V>> entrySet() {
+                return ImmutableSet.of();
+            }
+        };
+    }
+
+    private static class NoopStatsCounter
+            implements StatsCounter {
+        private static final CacheStats EMPTY_STATS = new 
SimpleStatsCounter().snapshot();
+
+        @Override
+        public void recordHits(int count) {}
+
+        @Override
+        public void recordMisses(int count) {}
+
+        @Override
+        public void recordLoadSuccess(long loadTime) {}
+
+        @Override
+        public void recordLoadException(long loadTime) {}
+
+        @Override
+        public void recordEviction() {}
+
+        @Override
+        public CacheStats snapshot() {
+            return EMPTY_STATS;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java
new file mode 100644
index 00000000000..a2cb05d82c5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Verify;
+import com.google.common.cache.AbstractLoadingCache;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalCause;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import javax.annotation.Nullable;
+
+/**
+ * A {@link Cache} and {@link LoadingCache} implementation similar to ones
+ * produced by {@link CacheBuilder#build()}, but one that does not
+ * exhibit <a href="https://github.com/google/guava/issues/1881";>Guava issue 
#1881</a>:
+ * a cache inspection with {@link #getIfPresent(Object)} or {@link 
#get(Object, Callable)}
+ * is guaranteed to return fresh state after {@link #invalidate(Object)},
+ * {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called.
+ *
+ * @see EvictableCacheBuilder
+ */
+// @ElementTypesAreNonnullByDefault
+class EvictableCache<K, V>
+        extends AbstractLoadingCache<K, V>
+        implements LoadingCache<K, V> {
+    // Invariant: for every (K, token) entry in the tokens map, there is a live
+    // cache entry (token, ?) in dataCache, that, upon eviction, will cause 
the tokens'
+    // entry to be removed.
+    private final ConcurrentHashMap<K, Token<K>> tokens = new 
ConcurrentHashMap<>();
+    // The dataCache can have entries with no corresponding tokens in the 
tokens map.
+    // For example, this can happen when invalidation concurs with load.
+    // The dataCache must be bounded.
+    private final LoadingCache<Token<K>, V> dataCache;
+
+    private final AtomicInteger invalidations = new AtomicInteger();
+
+    EvictableCache(CacheBuilder<? super Token<K>, ? super V> cacheBuilder, 
CacheLoader<? super K, V> cacheLoader) {
+        dataCache = buildUnsafeCache(
+                cacheBuilder
+                        .<Token<K>, V>removalListener(removal -> {
+                            Token<K> token = removal.getKey();
+                            Verify.verify(token != null, "token is null");
+                            if (removal.getCause() != RemovalCause.REPLACED) {
+                                tokens.remove(token.getKey(), token);
+                            }
+                        }),
+                new TokenCacheLoader<>(cacheLoader));
+    }
+
+    // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden,
+    // advising to use this class as a safety-adding wrapper.
+    private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? 
super K, ? super V> cacheBuilder,
+            CacheLoader<? super K, V> cacheLoader) {
+        return cacheBuilder.build(cacheLoader);
+    }
+
+    @Override
+    public V getIfPresent(Object key) {
+        @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as 
key K
+        Token<K> token = tokens.get(key);
+        if (token == null) {
+            return null;
+        }
+        return dataCache.getIfPresent(token);
+    }
+
+    @Override
+    public V get(K key, Callable<? extends V> valueLoader)
+            throws ExecutionException {
+        Token<K> newToken = new Token<>(key);
+        int invalidations = this.invalidations.get();
+        Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken);
+        try {
+            V value = dataCache.get(token, valueLoader);
+            if (invalidations == this.invalidations.get()) {
+                // Revive token if it got expired before reloading
+                if (tokens.putIfAbsent(key, token) == null) {
+                    // Revived
+                    if (!dataCache.asMap().containsKey(token)) {
+                        // We revived, but the token does not correspond to a 
live entry anymore.
+                        // It would stay in tokens forever, so let's remove it.
+                        tokens.remove(key, token);
+                    }
+                }
+            }
+            return value;
+        } catch (Throwable e) {
+            if (newToken == token) {
+                // Failed to load and it was our new token persisted in tokens 
map.
+                // No cache entry exists for the token (unless concurrent load 
happened),
+                // so we need to remove it.
+                tokens.remove(key, newToken);
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public V get(K key)
+            throws ExecutionException {
+        Token<K> newToken = new Token<>(key);
+        int invalidations = this.invalidations.get();
+        Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken);
+        try {
+            V value = dataCache.get(token);
+            if (invalidations == this.invalidations.get()) {
+                // Revive token if it got expired before reloading
+                if (tokens.putIfAbsent(key, token) == null) {
+                    // Revived
+                    if (!dataCache.asMap().containsKey(token)) {
+                        // We revived, but the token does not correspond to a 
live entry anymore.
+                        // It would stay in tokens forever, so let's remove it.
+                        tokens.remove(key, token);
+                    }
+                }
+            }
+            return value;
+        } catch (Throwable e) {
+            if (newToken == token) {
+                // Failed to load and it was our new token persisted in tokens 
map.
+                // No cache entry exists for the token (unless concurrent load 
happened),
+                // so we need to remove it.
+                tokens.remove(key, newToken);
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public ImmutableMap<K, V> getAll(Iterable<? extends K> keys)
+            throws ExecutionException {
+        List<Token<K>> newTokens = new ArrayList<>();
+        List<Token<K>> temporaryTokens = new ArrayList<>();
+        try {
+            Map<K, V> result = new LinkedHashMap<>();
+            for (K key : keys) {
+                if (result.containsKey(key)) {
+                    continue;
+                }
+                // This is not bulk, but is fast local operation
+                Token<K> newToken = new Token<>(key);
+                Token<K> oldToken = tokens.putIfAbsent(key, newToken);
+                if (oldToken != null) {
+                    // Token exists but a data may not exist (e.g. due to 
concurrent eviction)
+                    V value = dataCache.getIfPresent(oldToken);
+                    if (value != null) {
+                        result.put(key, value);
+                        continue;
+                    }
+                    // Old token exists but value wasn't found. This can 
happen when there is concurrent
+                    // eviction/invalidation or when the value is still being 
loaded.
+                    // The new token is not registered in tokens, so won't be 
used by subsequent invocations.
+                    temporaryTokens.add(newToken);
+                }
+                newTokens.add(newToken);
+            }
+
+            Map<Token<K>, V> values = dataCache.getAll(newTokens);
+            for (Map.Entry<Token<K>, V> entry : values.entrySet()) {
+                Token<K> newToken = entry.getKey();
+                result.put(newToken.getKey(), entry.getValue());
+            }
+            return ImmutableMap.copyOf(result);
+        } catch (Throwable e) {
+            for (Token<K> token : newTokens) {
+                // Failed to load and it was our new token (potentially) 
persisted in tokens map.
+                // No cache entry exists for the token (unless concurrent load 
happened),
+                // so we need to remove it.
+                tokens.remove(token.getKey(), token);
+            }
+            throw e;
+        } finally {
+            dataCache.invalidateAll(temporaryTokens);
+        }
+    }
+
+    @Override
+    public void refresh(K key) {
+        // The refresh loads a new entry, if it wasn't in the cache yet. Thus, 
we would create a new Token.
+        // However, dataCache.refresh is asynchronous and may fail, so no 
cache entry may be created.
+        // In such case we would leak the newly created token.
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long size() {
+        return dataCache.size();
+    }
+
+    @Override
+    public void cleanUp() {
+        dataCache.cleanUp();
+    }
+
+    @VisibleForTesting
+    int tokensCount() {
+        return tokens.size();
+    }
+
+    @Override
+    public void invalidate(Object key) {
+        invalidations.incrementAndGet();
+        @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as 
key K
+        Token<K> token = tokens.remove(key);
+        if (token != null) {
+            dataCache.invalidate(token);
+        }
+    }
+
+    @Override
+    public void invalidateAll() {
+        invalidations.incrementAndGet();
+        dataCache.invalidateAll();
+        tokens.clear();
+    }
+
+    // Not thread safe, test only.
+    @VisibleForTesting
+    void clearDataCacheOnly() {
+        Map<K, Token<K>> tokensCopy = new HashMap<>(tokens);
+        dataCache.asMap().clear();
+        Verify.verify(tokens.isEmpty(), "Clearing dataCache should trigger 
tokens eviction");
+        tokens.putAll(tokensCopy);
+    }
+
+    @Override
+    public CacheStats stats() {
+        return dataCache.stats();
+    }
+
+    @Override
+    public ConcurrentMap<K, V> asMap() {
+        return new ConcurrentMap<K, V>() {
+            private final ConcurrentMap<Token<K>, V> dataCacheMap = 
dataCache.asMap();
+
+            @Override
+            public V putIfAbsent(K key, V value) {
+                throw new UnsupportedOperationException("The operation is not 
supported,"
+                        + " as in inherently races with cache invalidation");
+            }
+
+            @Override
+            public V compute(K key, BiFunction<? super K, ? super V, ? extends 
V> remappingFunction) {
+                // default implementation of ConcurrentMap#compute uses not 
supported putIfAbsent in some cases
+                throw new UnsupportedOperationException("The operation is not 
supported, as in inherently"
+                        + " races with cache invalidation");
+            }
+
+            @Override
+            public boolean remove(Object key, Object value) {
+                @SuppressWarnings("SuspiciousMethodCalls") // Object passed to 
map as key K
+                Token<K> token = tokens.get(key);
+                if (token != null) {
+                    return dataCacheMap.remove(token, value);
+                }
+                return false;
+            }
+
+            @Override
+            public boolean replace(K key, V oldValue, V newValue) {
+                Token<K> token = tokens.get(key);
+                if (token != null) {
+                    return dataCacheMap.replace(token, oldValue, newValue);
+                }
+                return false;
+            }
+
+            @Override
+            public V replace(K key, V value) {
+                throw new UnsupportedOperationException("The operation is not 
supported, as in inherently races"
+                        + " with cache invalidation");
+            }
+
+            @Override
+            public int size() {
+                return dataCache.asMap().size();
+            }
+
+            @Override
+            public boolean isEmpty() {
+                return dataCache.asMap().isEmpty();
+            }
+
+            @Override
+            public boolean containsKey(Object key) {
+                return tokens.containsKey(key);
+            }
+
+            @Override
+            public boolean containsValue(Object value) {
+                return values().contains(value);
+            }
+
+            @Override
+            @Nullable
+            public V get(Object key) {
+                return getIfPresent(key);
+            }
+
+            @Override
+            public V put(K key, V value) {
+                throw new UnsupportedOperationException("The operation is not 
supported, as in inherently"
+                        + " races with cache invalidation. Use get(key, 
callable) instead.");
+            }
+
+            @Override
+            @Nullable
+            public V remove(Object key) {
+                Token<K> token = tokens.remove(key);
+                if (token != null) {
+                    return dataCacheMap.remove(token);
+                }
+                return null;
+            }
+
+            @Override
+            public void putAll(Map<? extends K, ? extends V> m) {
+                throw new UnsupportedOperationException("The operation is not 
supported, as in inherently"
+                        + " races with cache invalidation. Use get(key, 
callable) instead.");
+            }
+
+            @Override
+            public void clear() {
+                dataCacheMap.clear();
+                tokens.clear();
+            }
+
+            @Override
+            public Set<K> keySet() {
+                return tokens.keySet();
+            }
+
+            @Override
+            public Collection<V> values() {
+                return dataCacheMap.values();
+            }
+
+            @Override
+            public Set<Map.Entry<K, V>> entrySet() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    // instance-based equality
+    static final class Token<K> {
+        private final K key;
+
+        Token(K key) {
+            this.key = Objects.requireNonNull(key, "key is null");
+        }
+
+        K getKey() {
+            return key;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("CacheToken(%s; %s)", 
Integer.toHexString(hashCode()), key);
+        }
+    }
+
+    private static class TokenCacheLoader<K, V>
+            extends CacheLoader<Token<K>, V> {
+        private final CacheLoader<? super K, V> delegate;
+
+        public TokenCacheLoader(CacheLoader<? super K, V> delegate) {
+            this.delegate = Objects.requireNonNull(delegate, "delegate is 
null");
+        }
+
+        @Override
+        public V load(Token<K> token)
+                throws Exception {
+            return delegate.load(token.getKey());
+        }
+
+        @Override
+        public ListenableFuture<V> reload(Token<K> token, V oldValue)
+                throws Exception {
+            return delegate.reload(token.getKey(), oldValue);
+        }
+
+        @Override
+        public Map<Token<K>, V> loadAll(Iterable<? extends Token<K>> tokens)
+                throws Exception {
+            List<Token<K>> tokenList = ImmutableList.copyOf(tokens);
+            List<K> keys = new ArrayList<>();
+            for (Token<K> token : tokenList) {
+                keys.add(token.getKey());
+            }
+            Map<? super K, V> values;
+            try {
+                values = delegate.loadAll(keys);
+            } catch (UnsupportedLoadingOperationException e) {
+                // Guava uses UnsupportedLoadingOperationException in 
LoadingCache.loadAll
+                // to fall back from bulk loading (without load sharing) to 
loading individual
+                // values (with load sharing). EvictableCache implementation 
does not currently
+                // support the fallback mechanism, so the individual values 
would be loaded
+                // without load sharing. This would be an unintentional and 
non-obvious behavioral
+                // discrepancy between EvictableCache and Guava Caches, so the 
mechanism is disabled.
+                throw new UnsupportedOperationException("LoadingCache.getAll() 
is not supported by EvictableCache"
+                        + " when CacheLoader.loadAll is not implemented", e);
+            }
+
+            ImmutableMap.Builder<Token<K>, V> result = ImmutableMap.builder();
+            for (int i = 0; i < tokenList.size(); i++) {
+                Token<K> token = tokenList.get(i);
+                K key = keys.get(i);
+                V value = values.get(key);
+                // CacheLoader.loadAll is not guaranteed to return values for 
all the keys
+                if (value != null) {
+                    result.put(token, value);
+                }
+            }
+            return result.buildOrThrow();
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .addValue(delegate)
+                    .toString();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java
new file mode 100644
index 00000000000..8da3f8c8d56
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCacheBuilder.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import org.apache.doris.common.EvictableCache.Token;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import com.google.errorprone.annotations.CheckReturnValue;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Builder for {@link Cache} and {@link LoadingCache} instances, similar to 
{@link CacheBuilder},
+ * but creating cache implementations that do not exhibit
+ * <a href="https://github.com/google/guava/issues/1881";>Guava issue #1881</a>:
+ * a cache inspection with {@link Cache#getIfPresent(Object)} or {@link 
Cache#get(Object, Callable)}
+ * is guaranteed to return fresh state after {@link Cache#invalidate(Object)},
+ * {@link Cache#invalidateAll(Iterable)} or {@link Cache#invalidateAll()} were 
called.
+ */
+public final class EvictableCacheBuilder<K, V> {
+    @CheckReturnValue
+    public static EvictableCacheBuilder<Object, Object> newBuilder() {
+        return new EvictableCacheBuilder<>();
+    }
+
+    private Optional<Ticker> ticker = Optional.empty();
+    private Optional<Duration> expireAfterWrite = Optional.empty();
+    private Optional<Duration> refreshAfterWrite = Optional.empty();
+    private Optional<Long> maximumSize = Optional.empty();
+    private Optional<Long> maximumWeight = Optional.empty();
+    private Optional<Integer> concurrencyLevel = Optional.empty();
+    private Optional<Weigher<? super Token<K>, ? super V>> weigher = 
Optional.empty();
+    private boolean recordStats;
+    private Optional<DisabledCacheImplementation> disabledCacheImplementation 
= Optional.empty();
+
+    private EvictableCacheBuilder() {}
+
+    /**
+     * Pass-through for {@link CacheBuilder#ticker(Ticker)}.
+     */
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> ticker(Ticker ticker) {
+        this.ticker = Optional.of(ticker);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> expireAfterWrite(long duration, 
TimeUnit unit) {
+        return expireAfterWrite(toDuration(duration, unit));
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> expireAfterWrite(Duration duration) {
+        Preconditions.checkState(!this.expireAfterWrite.isPresent(), 
"expireAfterWrite already set");
+        this.expireAfterWrite = Optional.of(duration);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> refreshAfterWrite(long duration, 
TimeUnit unit) {
+        return refreshAfterWrite(toDuration(duration, unit));
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> refreshAfterWrite(Duration duration) {
+        Preconditions.checkState(!this.refreshAfterWrite.isPresent(), 
"refreshAfterWrite already set");
+        this.refreshAfterWrite = Optional.of(duration);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> maximumSize(long maximumSize) {
+        Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize 
already set");
+        Preconditions.checkState(!this.maximumWeight.isPresent(), 
"maximumWeight already set");
+        this.maximumSize = Optional.of(maximumSize);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> maximumWeight(long maximumWeight) {
+        Preconditions.checkState(!this.maximumWeight.isPresent(), 
"maximumWeight already set");
+        Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize 
already set");
+        this.maximumWeight = Optional.of(maximumWeight);
+        return this;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> concurrencyLevel(int concurrencyLevel) {
+        Preconditions.checkState(!this.concurrencyLevel.isPresent(), 
"concurrencyLevel already set");
+        this.concurrencyLevel = Optional.of(concurrencyLevel);
+        return this;
+    }
+
+    public <K1 extends K, V1 extends V> EvictableCacheBuilder<K1, V1> 
weigher(Weigher<? super K1, ? super V1> weigher) {
+        Preconditions.checkState(!this.weigher.isPresent(), "weigher already 
set");
+        @SuppressWarnings("unchecked") // see 
com.google.common.cache.CacheBuilder.weigher
+        EvictableCacheBuilder<K1, V1> cast = (EvictableCacheBuilder<K1, V1>) 
this;
+        cast.weigher = Optional.of(new TokenWeigher<>(weigher));
+        return cast;
+    }
+
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> recordStats() {
+        recordStats = true;
+        return this;
+    }
+
+    /**
+     * Choose a behavior for case when caching is disabled that may allow data 
and failure
+     * sharing between concurrent callers.
+     */
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> shareResultsAndFailuresEvenIfDisabled() 
{
+        return disabledCacheImplementation(DisabledCacheImplementation.GUAVA);
+    }
+
+    /**
+     * Choose a behavior for case when caching is disabled that prevents data 
and
+     * failure sharing between concurrent callers.
+     * Note: disabled cache won't report any statistics.
+     */
+    @CanIgnoreReturnValue
+    public EvictableCacheBuilder<K, V> shareNothingWhenDisabled() {
+        return disabledCacheImplementation(DisabledCacheImplementation.NOOP);
+    }
+
+    @VisibleForTesting
+    EvictableCacheBuilder<K, V> 
disabledCacheImplementation(DisabledCacheImplementation cacheImplementation) {
+        Preconditions.checkState(!disabledCacheImplementation.isPresent(), 
"disabledCacheImplementation already set");
+        disabledCacheImplementation = Optional.of(cacheImplementation);
+        return this;
+    }
+
+    @CheckReturnValue
+    public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
+        return build(unimplementedCacheLoader());
+    }
+
+    @CheckReturnValue
+    public <K1 extends K, V1 extends V> LoadingCache<K1, V1> 
build(CacheLoader<? super K1, V1> loader) {
+        if (cacheDisabled()) {
+            // Silently providing a behavior different from Guava's could be 
surprising, so require explicit choice.
+            DisabledCacheImplementation disabledCacheImplementation = 
this.disabledCacheImplementation.orElseThrow(
+                    () -> new IllegalStateException(
+                    "Even when cache is disabled, the loads are synchronized 
and both load results and failures"
+                            + " are shared between threads. " + "This is 
rarely desired, thus builder caller is"
+                            + " expected to either opt-in into this behavior 
with"
+                            + " shareResultsAndFailuresEvenIfDisabled(), or 
choose not to share results (and failures)"
+                            + " between concurrent invocations with 
shareNothingWhenDisabled()."));
+
+            switch (disabledCacheImplementation) {
+                case NOOP:
+                    return new EmptyCache<>(loader, recordStats);
+                case GUAVA: {
+                    // Disabled cache is always empty, so doesn't exhibit 
invalidation problems.
+                    // Avoid overhead of EvictableCache wrapper.
+                    CacheBuilder<Object, Object> cacheBuilder = 
CacheBuilder.newBuilder()
+                            .maximumSize(0)
+                            .expireAfterWrite(0, TimeUnit.SECONDS);
+                    if (recordStats) {
+                        cacheBuilder.recordStats();
+                    }
+                    return buildUnsafeCache(cacheBuilder, loader);
+                }
+                default:
+                    throw new IllegalStateException("Unexpected value: " + 
disabledCacheImplementation);
+            }
+        }
+
+        if (!(maximumSize.isPresent() || maximumWeight.isPresent() || 
expireAfterWrite.isPresent())) {
+            // EvictableCache invalidation (e.g. invalidateAll) happening 
concurrently with a load may
+            // lead to an entry remaining in the cache, without associated 
token. This would lead to
+            // a memory leak in an unbounded cache.
+            throw new IllegalStateException("Unbounded cache is not 
supported");
+        }
+
+        // CacheBuilder is further modified in EvictableCache::new, so cannot 
be shared between build() calls.
+        CacheBuilder<Object, ? super V> cacheBuilder = 
CacheBuilder.newBuilder();
+        ticker.ifPresent(cacheBuilder::ticker);
+        expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite);
+        refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite);
+        maximumSize.ifPresent(cacheBuilder::maximumSize);
+        maximumWeight.ifPresent(cacheBuilder::maximumWeight);
+        weigher.ifPresent(cacheBuilder::weigher);
+        concurrencyLevel.ifPresent(cacheBuilder::concurrencyLevel);
+        if (recordStats) {
+            cacheBuilder.recordStats();
+        }
+        return new EvictableCache<>(cacheBuilder, loader);
+    }
+
+    private boolean cacheDisabled() {
+        return (maximumSize.isPresent() && maximumSize.get() == 0)
+                || (expireAfterWrite.isPresent() && 
expireAfterWrite.get().isZero());
+    }
+
+    // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden,
+    // advising to use this class as a safety-adding wrapper.
+    private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? 
super K, ? super V> cacheBuilder,
+            CacheLoader<? super K, V> cacheLoader) {
+        return cacheBuilder.build(cacheLoader);
+    }
+
+    private static <K, V> CacheLoader<K, V> unimplementedCacheLoader() {
+        return CacheLoader.from(ignored -> {
+            throw new UnsupportedOperationException();
+        });
+    }
+
+    private static final class TokenWeigher<K, V>
+            implements Weigher<Token<K>, V> {
+        private final Weigher<? super K, ? super V> delegate;
+
+        private TokenWeigher(Weigher<? super K, ? super V> delegate) {
+            this.delegate = Objects.requireNonNull(delegate, "delegate is 
null");
+        }
+
+        @Override
+        public int weigh(Token<K> key, V value) {
+            return delegate.weigh(key.getKey(), value);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TokenWeigher<?, ?> that = (TokenWeigher<?, ?>) o;
+            return Objects.equals(delegate, that.delegate);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(delegate);
+        }
+
+        @Override
+        public String toString() {
+            return "TokenWeigher{" + "delegate=" + delegate + '}';
+        }
+    }
+
+    private static Duration toDuration(long duration, TimeUnit unit) {
+        // Saturated conversion, as in 
com.google.common.cache.CacheBuilder.toNanosSaturated
+        return Duration.ofNanos(unit.toNanos(duration));
+    }
+
+    @VisibleForTesting
+    enum DisabledCacheImplementation {
+        NOOP,
+        GUAVA,
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index dccefc8b743..5321039bedc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -39,6 +39,7 @@ import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
 import org.apache.doris.datasource.hudi.HudiUtils;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.fs.FileSystemDirectoryLister;
 import org.apache.doris.mtmv.MTMVBaseTableIf;
 import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
 import org.apache.doris.mtmv.MTMVRefreshContext;
@@ -1059,7 +1060,8 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
                 LOG.debug("Chosen partition for table {}. [{}]", name, 
partition.toString());
             }
         }
-        return cache.getFilesByPartitionsWithoutCache(hivePartitions, 
bindBrokerName);
+        return cache.getFilesByPartitionsWithoutCache(hivePartitions, 
bindBrokerName,
+                new FileSystemDirectoryLister(), null);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index e3bbd681b9e..9bf63df6970 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -18,13 +18,13 @@
 package org.apache.doris.datasource.hive;
 
 import org.apache.doris.analysis.PartitionValue;
-import org.apache.doris.backup.Status;
 import org.apache.doris.backup.Status.ErrCode;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CacheFactory;
@@ -39,7 +39,11 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.fs.DirectoryLister;
 import org.apache.doris.fs.FileSystemCache;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.FileSystemIOException;
+import org.apache.doris.fs.RemoteIterator;
 import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
@@ -185,7 +189,7 @@ public class HiveMetaStoreCache {
 
             @Override
             public FileCacheValue load(FileCacheKey key) {
-                return loadFiles(key);
+                return loadFiles(key, new FileSystemDirectoryLister(), null);
             }
         };
 
@@ -338,7 +342,9 @@ public class HiveMetaStoreCache {
     private FileCacheValue getFileCache(String location, String inputFormat,
             JobConf jobConf,
             List<String> partitionValues,
-            String bindBrokerName) throws UserException {
+            String bindBrokerName,
+            DirectoryLister directoryLister,
+            TableIf table) throws UserException {
         FileCacheValue result = new FileCacheValue();
         RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                 new 
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
@@ -353,34 +359,37 @@ public class HiveMetaStoreCache {
         //      /user/hive/warehouse/region_tmp_union_all2/2
         // So we need to recursively list data location.
         // 
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
-        List<RemoteFile> remoteFiles = new ArrayList<>();
         boolean isRecursiveDirectories = Boolean.valueOf(
                 
catalog.getProperties().getOrDefault("hive.recursive_directories", "false"));
-        Status status = fs.listFiles(location, isRecursiveDirectories, 
remoteFiles);
-        if (status.ok()) {
-            for (RemoteFile remoteFile : remoteFiles) {
+        try {
+            RemoteIterator<RemoteFile> iterator = 
directoryLister.listFiles(fs, isRecursiveDirectories,
+                    table, location);
+            while (iterator.hasNext()) {
+                RemoteFile remoteFile = iterator.next();
                 String srcPath = remoteFile.getPath().toString();
                 LocationPath locationPath = new LocationPath(srcPath, 
catalog.getProperties());
                 result.addFile(remoteFile, locationPath);
             }
-        } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
-            // User may manually remove partition under HDFS, in this case,
-            // Hive doesn't aware that the removed partition is missing.
-            // Here is to support this case without throw an exception.
-            LOG.warn(String.format("File %s not exist.", location));
-            if (!Boolean.valueOf(catalog.getProperties()
-                    .getOrDefault("hive.ignore_absent_partitions", "true"))) {
-                throw new UserException("Partition location does not exist: " 
+ location);
+        } catch (FileSystemIOException e) {
+            if (e.getErrorCode().isPresent() && 
e.getErrorCode().get().equals(ErrCode.NOT_FOUND)) {
+                // User may manually remove partition under HDFS, in this case,
+                // Hive doesn't aware that the removed partition is missing.
+                // Here is to support this case without throw an exception.
+                LOG.warn(String.format("File %s not exist.", location));
+                if (!Boolean.valueOf(catalog.getProperties()
+                        .getOrDefault("hive.ignore_absent_partitions", 
"true"))) {
+                    throw new UserException("Partition location does not 
exist: " + location);
+                }
+            } else {
+                throw new RuntimeException(e);
             }
-        } else {
-            throw new RuntimeException(status.getErrMsg());
         }
         // Must copy the partitionValues to avoid concurrent modification of 
key and value
         result.setPartitionValues(Lists.newArrayList(partitionValues));
         return result;
     }
 
-    private FileCacheValue loadFiles(FileCacheKey key) {
+    private FileCacheValue loadFiles(FileCacheKey key, DirectoryLister 
directoryLister, TableIf table) {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
@@ -404,7 +413,7 @@ public class HiveMetaStoreCache {
             FileInputFormat.setInputPaths(jobConf, finalLocation.get());
             try {
                 FileCacheValue result = getFileCache(finalLocation.get(), 
key.inputFormat, jobConf,
-                        key.getPartitionValues(), key.bindBrokerName);
+                        key.getPartitionValues(), key.bindBrokerName, 
directoryLister, table);
                 // Replace default hive partition with a null_string.
                 for (int i = 0; i < result.getValuesSize(); i++) {
                     if 
(HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) {
@@ -456,19 +465,25 @@ public class HiveMetaStoreCache {
     }
 
     public List<FileCacheValue> 
getFilesByPartitionsWithCache(List<HivePartition> partitions,
-                                                              String 
bindBrokerName) {
-        return getFilesByPartitions(partitions, true, true, bindBrokerName);
+                                                              String 
bindBrokerName,
+                                                              DirectoryLister 
directoryLister,
+                                                              TableIf table) {
+        return getFilesByPartitions(partitions, true, true, bindBrokerName, 
directoryLister, table);
     }
 
     public List<FileCacheValue> 
getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
-                                                                 String 
bindBrokerName) {
-        return getFilesByPartitions(partitions, false, true, bindBrokerName);
+                                                                 String 
bindBrokerName,
+                                                                 
DirectoryLister directoryLister,
+                                                                 TableIf 
table) {
+        return getFilesByPartitions(partitions, false, true, bindBrokerName, 
directoryLister, table);
     }
 
     public List<FileCacheValue> getFilesByPartitions(List<HivePartition> 
partitions,
                                                      boolean withCache,
                                                      boolean concurrent,
-                                                     String bindBrokerName) {
+                                                     String bindBrokerName,
+            DirectoryLister directoryLister,
+            TableIf table) {
         long start = System.currentTimeMillis();
         List<FileCacheKey> keys = partitions.stream().map(p -> 
p.isDummyPartition()
                 ? FileCacheKey.createDummyCacheKey(
@@ -484,13 +499,15 @@ public class HiveMetaStoreCache {
             } else {
                 if (concurrent) {
                     List<Future<FileCacheValue>> pList = keys.stream().map(
-                            key -> fileListingExecutor.submit(() -> 
loadFiles(key))).collect(Collectors.toList());
+                            key -> fileListingExecutor.submit(() -> 
loadFiles(key, directoryLister, table)))
+                            .collect(Collectors.toList());
                     fileLists = 
Lists.newArrayListWithExpectedSize(keys.size());
                     for (Future<FileCacheValue> p : pList) {
                         fileLists.add(p.get());
                     }
                 } else {
-                    fileLists = 
keys.stream().map(this::loadFiles).collect(Collectors.toList());
+                    fileLists = keys.stream().map((key) -> loadFiles(key, 
directoryLister, table))
+                            .collect(Collectors.toList());
                 }
             }
         } catch (ExecutionException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 08cf6582447..890f6147f33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -42,6 +42,7 @@ import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.HiveProperties;
 import org.apache.doris.datasource.hive.HiveTransaction;
 import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
+import org.apache.doris.fs.DirectoryLister;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
@@ -86,6 +87,8 @@ public class HiveScanNode extends FileQueryScanNode {
     @Setter
     protected SelectedPartitions selectedPartitions = null;
 
+    private DirectoryLister directoryLister;
+
     private boolean partitionInit = false;
     private final AtomicReference<UserException> batchException = new 
AtomicReference<>(null);
     private List<HivePartition> prunedPartitions;
@@ -100,15 +103,18 @@ public class HiveScanNode extends FileQueryScanNode {
      * eg: s3 tvf
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
-        this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv, sv);
+    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv,
+            DirectoryLister directoryLister) {
+        this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv, sv, directoryLister);
     }
 
     public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
-            StatisticalType statisticalType, boolean needCheckColumnPriv, 
SessionVariable sv) {
+            StatisticalType statisticalType, boolean needCheckColumnPriv, 
SessionVariable sv,
+            DirectoryLister directoryLister) {
         super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, 
sv);
         hmsTable = (HMSExternalTable) desc.getTable();
         brokerName = hmsTable.getCatalog().bindBrokerName();
+        this.directoryLister = directoryLister;
     }
 
     @Override
@@ -276,7 +282,8 @@ public class HiveScanNode extends FileQueryScanNode {
             }
         } else {
             boolean withCache = Config.max_external_file_cache_num > 0;
-            fileCaches = cache.getFilesByPartitions(partitions, withCache, 
partitions.size() > 1, bindBrokerName);
+            fileCaches = cache.getFilesByPartitions(partitions, withCache, 
partitions.size() > 1, bindBrokerName,
+                    directoryLister, hmsTable);
         }
         if (tableSample != null) {
             List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = 
selectFiles(fileCaches);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index a6156924e27..b8f3d7bd198 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -37,6 +37,7 @@ import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
+import org.apache.doris.fs.DirectoryLister;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
@@ -119,8 +120,8 @@ public class HudiScanNode extends HiveScanNode {
      */
     public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
             Optional<TableScanParams> scanParams, 
Optional<IncrementalRelation> incrementalRelation,
-            SessionVariable sv) {
-        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv, sv);
+            SessionVariable sv, DirectoryLister directoryLister) {
+        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv, sv, directoryLister);
         isCowTable = hmsTable.isHoodieCowTable();
         if (LOG.isDebugEnabled()) {
             if (isCowTable) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java
new file mode 100644
index 00000000000..e97bc2c684f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+public interface DirectoryLister {
+    RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, 
TableIf table, String location)
+            throws FileSystemIOException;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java
new file mode 100644
index 00000000000..dcd7eeb16b3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileSystemDirectoryLister implements DirectoryLister {
+    public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean 
recursive, TableIf table, String location)
+            throws FileSystemIOException {
+        List<RemoteFile> result = new ArrayList<>();
+        Status status = fs.listFiles(location, recursive, result);
+        if (!status.ok()) {
+            throw new FileSystemIOException(status.getErrCode(), 
status.getErrMsg());
+        }
+        return new RemoteFileRemoteIterator(result);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java
new file mode 100644
index 00000000000..c9e45d0352b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.backup.Status.ErrCode;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+public class FileSystemIOException extends IOException {
+
+    @Nullable
+    private ErrCode errCode;
+
+    public FileSystemIOException(ErrCode errCode, String message) {
+        super(message);
+        this.errCode = errCode;
+    }
+
+    public FileSystemIOException(ErrCode errCode, String message, Throwable 
cause) {
+        super(message, cause);
+        this.errCode = errCode;
+    }
+
+    public FileSystemIOException(String message) {
+        super(message);
+        this.errCode = null;
+    }
+
+    public FileSystemIOException(String message, Throwable cause) {
+        super(message, cause);
+        this.errCode = null;
+    }
+
+    public Optional<ErrCode> getErrorCode() {
+        return Optional.ofNullable(errCode);
+    }
+
+    @Override
+    public String getMessage() {
+        if (errCode != null) {
+            return String.format("[%s]: %s",
+                    errCode,
+                    super.getMessage());
+        } else {
+            return super.getMessage();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java
new file mode 100644
index 00000000000..6ac8eb3b0c6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+public class RemoteFileRemoteIterator
+        implements RemoteIterator<RemoteFile> {
+    private final List<RemoteFile> remoteFileList;
+    private int currentIndex = 0;
+
+    public RemoteFileRemoteIterator(List<RemoteFile> remoteFileList) {
+        this.remoteFileList = Objects.requireNonNull(remoteFileList, "iterator 
is null");
+    }
+
+    @Override
+    public boolean hasNext() throws FileSystemIOException {
+        return currentIndex < remoteFileList.size();
+    }
+
+    @Override
+    public RemoteFile next() throws FileSystemIOException {
+        if (!hasNext()) {
+            throw new NoSuchElementException("No more elements in 
RemoteFileRemoteIterator");
+        }
+        return remoteFileList.get(currentIndex++);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java
new file mode 100644
index 00000000000..b9d212a15a5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+public interface RemoteIterator<T> {
+    boolean hasNext() throws FileSystemIOException;
+
+    T next() throws FileSystemIOException;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java
new file mode 100644
index 00000000000..4332a5fed35
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.fs;
+
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.util.Iterator;
+import java.util.Objects;
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java
+// and modified by Doris
+
+class SimpleRemoteIterator implements RemoteIterator<RemoteFile> {
+    private final Iterator<RemoteFile> iterator;
+
+    public SimpleRemoteIterator(Iterator<RemoteFile> iterator) {
+        this.iterator = Objects.requireNonNull(iterator, "iterator is null");
+    }
+
+    @Override
+    public boolean hasNext() throws FileSystemIOException {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public RemoteFile next() throws FileSystemIOException {
+        return iterator.next();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java
new file mode 100644
index 00000000000..6be3c03f824
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import java.util.Objects;
+
+public class TransactionDirectoryListingCacheKey {
+
+    private final long transactionId;
+    private final String path;
+
+    public TransactionDirectoryListingCacheKey(long transactionId, String 
path) {
+        this.transactionId = transactionId;
+        this.path = Objects.requireNonNull(path, "path is null");
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TransactionDirectoryListingCacheKey that = 
(TransactionDirectoryListingCacheKey) o;
+        return transactionId == that.transactionId && path.equals(that.path);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(transactionId, path);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new 
StringBuilder("TransactionDirectoryListingCacheKey{");
+        sb.append("transactionId=").append(transactionId);
+        sb.append(", path='").append(path).append('\'');
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java
new file mode 100644
index 00000000000..37acec6864f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.commons.collections.ListUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+
+/**
+ * Caches directory content (including listings that were started 
concurrently).
+ * {@link TransactionScopeCachingDirectoryLister} assumes that all listings
+ * are performed by same user within single transaction, therefore any failure 
can
+ * be shared between concurrent listings.
+ */
+public class TransactionScopeCachingDirectoryLister implements DirectoryLister 
{
+    private final long transactionId;
+
+    @VisibleForTesting
+    public Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> 
getCache() {
+        return cache;
+    }
+
+    //TODO use a cache key based on Path & SchemaTableName and iterate over 
the cache keys
+    // to deal more efficiently with cache invalidation scenarios for 
partitioned tables.
+    private final Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder> cache;
+    private final DirectoryLister delegate;
+
+    public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, 
long transactionId,
+            Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> 
cache) {
+        this.delegate = Objects.requireNonNull(delegate, "delegate is null");
+        this.transactionId = transactionId;
+        this.cache = Objects.requireNonNull(cache, "cache is null");
+    }
+
+    @Override
+    public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean 
recursive, TableIf table, String location)
+            throws FileSystemIOException {
+        return listInternal(fs, recursive, table, new 
TransactionDirectoryListingCacheKey(transactionId, location));
+    }
+
+    private RemoteIterator<RemoteFile> listInternal(FileSystem fs, boolean 
recursive, TableIf table,
+            TransactionDirectoryListingCacheKey cacheKey) throws 
FileSystemIOException {
+        FetchingValueHolder cachedValueHolder;
+        try {
+            cachedValueHolder = cache.get(cacheKey,
+                    () -> new 
FetchingValueHolder(createListingRemoteIterator(fs, recursive, table, 
cacheKey)));
+        } catch (ExecutionException | UncheckedExecutionException e) {
+            Throwable throwable = e.getCause();
+            Throwables.throwIfInstanceOf(throwable, 
FileSystemIOException.class);
+            Throwables.throwIfUnchecked(throwable);
+            throw new RuntimeException("Failed to list directory: " + 
cacheKey.getPath(), throwable);
+        }
+
+        if (cachedValueHolder.isFullyCached()) {
+            return new 
SimpleRemoteIterator(cachedValueHolder.getCachedFiles());
+        }
+
+        return cachingRemoteIterator(cachedValueHolder, cacheKey);
+    }
+
+    private RemoteIterator<RemoteFile> createListingRemoteIterator(FileSystem 
fs, boolean recursive,
+            TableIf table, TransactionDirectoryListingCacheKey cacheKey)
+            throws FileSystemIOException {
+        return delegate.listFiles(fs, recursive, table, cacheKey.getPath());
+    }
+
+
+    private RemoteIterator<RemoteFile> 
cachingRemoteIterator(FetchingValueHolder cachedValueHolder,
+            TransactionDirectoryListingCacheKey cacheKey) {
+        return new RemoteIterator<RemoteFile>() {
+            private int fileIndex;
+
+            @Override
+            public boolean hasNext()
+                    throws FileSystemIOException {
+                try {
+                    boolean hasNext = 
cachedValueHolder.getCachedFile(fileIndex).isPresent();
+                    // Update cache weight of cachedValueHolder for a given 
path.
+                    // The cachedValueHolder acts as an invalidation guard.
+                    // If a cache invalidation happens while this iterator 
goes over the files from the specified path,
+                    // the eventually outdated file listing will not be added 
anymore to the cache.
+                    cache.asMap().replace(cacheKey, cachedValueHolder, 
cachedValueHolder);
+                    return hasNext;
+                } catch (Exception exception) {
+                    // invalidate cached value to force retry of directory 
listing
+                    cache.invalidate(cacheKey);
+                    throw exception;
+                }
+            }
+
+            @Override
+            public RemoteFile next()
+                    throws FileSystemIOException {
+                // force cache entry weight update in case next file is cached
+                Preconditions.checkState(hasNext());
+                return 
cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(NoSuchElementException::new);
+            }
+        };
+    }
+
+    @VisibleForTesting
+    boolean isCached(String location) {
+        return isCached(new TransactionDirectoryListingCacheKey(transactionId, 
location));
+    }
+
+    @VisibleForTesting
+    boolean isCached(TransactionDirectoryListingCacheKey cacheKey) {
+        FetchingValueHolder cached = cache.getIfPresent(cacheKey);
+        return cached != null && cached.isFullyCached();
+    }
+
+    static class FetchingValueHolder {
+
+        private final List<RemoteFile> cachedFiles = 
ListUtils.synchronizedList(new ArrayList<RemoteFile>());
+
+        @GuardedBy("this")
+        @Nullable
+        private RemoteIterator<RemoteFile> fileIterator;
+        @GuardedBy("this")
+        @Nullable
+        private Exception exception;
+
+        public FetchingValueHolder(RemoteIterator<RemoteFile> fileIterator) {
+            this.fileIterator = Objects.requireNonNull(fileIterator, 
"fileIterator is null");
+        }
+
+        public synchronized boolean isFullyCached() {
+            return fileIterator == null && exception == null;
+        }
+
+        public long getCacheFileCount() {
+            return cachedFiles.size();
+        }
+
+        public Iterator<RemoteFile> getCachedFiles() {
+            Preconditions.checkState(isFullyCached());
+            return cachedFiles.iterator();
+        }
+
+        public Optional<RemoteFile> getCachedFile(int index)
+                throws FileSystemIOException {
+            int filesSize = cachedFiles.size();
+            Preconditions.checkArgument(index >= 0 && index <= filesSize,
+                    "File index (%s) out of bounds [0, %s]", index, filesSize);
+
+            // avoid fileIterator synchronization (and thus blocking) for 
already cached files
+            if (index < filesSize) {
+                return Optional.of(cachedFiles.get(index));
+            }
+
+            return fetchNextCachedFile(index);
+        }
+
+        private synchronized Optional<RemoteFile> fetchNextCachedFile(int 
index)
+                throws FileSystemIOException {
+            if (exception != null) {
+                throw new FileSystemIOException("Exception while listing 
directory", exception);
+            }
+
+            if (index < cachedFiles.size()) {
+                // file was fetched concurrently
+                return Optional.of(cachedFiles.get(index));
+            }
+
+            try {
+                if (fileIterator == null || !fileIterator.hasNext()) {
+                    // no more files
+                    fileIterator = null;
+                    return Optional.empty();
+                }
+
+                RemoteFile fileStatus = fileIterator.next();
+                cachedFiles.add(fileStatus);
+                return Optional.of(fileStatus);
+            } catch (Exception exception) {
+                fileIterator = null;
+                this.exception = exception;
+                throw exception;
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java
new file mode 100644
index 00000000000..c3c9c347c3d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryListerFactory.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.common.EvictableCacheBuilder;
+import 
org.apache.doris.fs.TransactionScopeCachingDirectoryLister.FetchingValueHolder;
+
+import com.google.common.cache.Cache;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TransactionScopeCachingDirectoryListerFactory {
+    //TODO use a cache key based on Path & SchemaTableName and iterate over 
the cache keys
+    // to deal more efficiently with cache invalidation scenarios for 
partitioned tables.
+    // private final Optional<Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder>> cache;
+
+    private final Optional<Cache<TransactionDirectoryListingCacheKey, 
FetchingValueHolder>> cache;
+
+    private final AtomicLong nextTransactionId = new AtomicLong();
+
+    public TransactionScopeCachingDirectoryListerFactory(long maxSize) {
+        if (maxSize > 0) {
+            EvictableCacheBuilder<TransactionDirectoryListingCacheKey, 
FetchingValueHolder> cacheBuilder =
+                    EvictableCacheBuilder.newBuilder()
+                    .maximumWeight(maxSize)
+                    .weigher((key, value) ->
+                        Math.toIntExact(value.getCacheFileCount()));
+            this.cache = Optional.of(cacheBuilder.build());
+        } else {
+            cache = Optional.empty();
+        }
+    }
+
+    public DirectoryLister get(DirectoryLister delegate) {
+        return cache
+                .map(cache -> (DirectoryLister) new 
TransactionScopeCachingDirectoryLister(delegate,
+                        nextTransactionId.getAndIncrement(), cache))
+                .orElse(delegate);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b439dbdc12d..5b19387289c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -46,6 +46,7 @@ import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.es.EsExternalTable;
@@ -68,6 +69,9 @@ import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.paimon.source.PaimonScanNode;
 import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
 import 
org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
+import org.apache.doris.fs.DirectoryLister;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.properties.DistributionSpec;
 import org.apache.doris.nereids.properties.DistributionSpecAny;
@@ -244,6 +248,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     private final StatsErrorEstimator statsErrorEstimator;
     private final PlanTranslatorContext context;
 
+    private DirectoryLister directoryLister;
+
     public PhysicalPlanTranslator() {
         this(null, null);
     }
@@ -559,12 +565,16 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         // TODO(cmy): determine the needCheckColumnPriv param
         ScanNode scanNode;
         if (table instanceof HMSExternalTable) {
+            if (directoryLister == null) {
+                this.directoryLister = new 
TransactionScopeCachingDirectoryListerFactory(
+                        
Config.max_external_table_split_file_meta_cache_num).get(new 
FileSystemDirectoryLister());
+            }
             switch (((HMSExternalTable) table).getDlaType()) {
                 case ICEBERG:
                     scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
                     break;
                 case HIVE:
-                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
+                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv, directoryLister);
                     HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
                     
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
                     if (fileScan.getTableSample().isPresent()) {
@@ -640,6 +650,10 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
 
     @Override
     public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, 
PlanTranslatorContext context) {
+        if (directoryLister == null) {
+            this.directoryLister = new 
TransactionScopeCachingDirectoryListerFactory(
+                    
Config.max_external_table_split_file_meta_cache_num).get(new 
FileSystemDirectoryLister());
+        }
         List<Slot> slots = fileScan.getOutput();
         ExternalTable table = fileScan.getTable();
         TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, 
context);
@@ -652,7 +666,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         + " for Hudi table");
         PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
         ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false,
-                hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), 
ConnectContext.get().getSessionVariable());
+                hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), 
ConnectContext.get().getSessionVariable(),
+                directoryLister);
         if (fileScan.getTableSnapshot().isPresent()) {
             ((FileQueryScanNode) 
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0a5de932243..552d6c4c45d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -58,6 +58,7 @@ import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
@@ -74,6 +75,9 @@ import 
org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
 import org.apache.doris.datasource.odbc.source.OdbcScanNode;
 import org.apache.doris.datasource.paimon.source.PaimonScanNode;
 import 
org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
+import org.apache.doris.fs.DirectoryLister;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
@@ -115,6 +119,8 @@ public class SingleNodePlanner {
     private final ArrayList<ScanNode> scanNodes = Lists.newArrayList();
     private Map<Analyzer, List<ScanNode>> selectStmtToScanNodes = 
Maps.newHashMap();
 
+    private DirectoryLister directoryLister;
+
     public SingleNodePlanner(PlannerContext ctx) {
         this.ctx = ctx;
     }
@@ -1959,6 +1965,10 @@ public class SingleNodePlanner {
                 scanNode = ((TableValuedFunctionRef) 
tblRef).getScanNode(ctx.getNextNodeId(), sv);
                 break;
             case HMS_EXTERNAL_TABLE:
+                if (directoryLister == null) {
+                    this.directoryLister = new 
TransactionScopeCachingDirectoryListerFactory(
+                            
Config.max_external_table_split_file_meta_cache_num).get(new 
FileSystemDirectoryLister());
+                }
                 TableIf table = tblRef.getDesc().getTable();
                 switch (((HMSExternalTable) table).getDlaType()) {
                     case HUDI:
@@ -1969,13 +1979,13 @@ public class SingleNodePlanner {
                                     + "please set enable_nereids_planner = 
true to enable new optimizer");
                         }
                         scanNode = new HudiScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true,
-                                Optional.empty(), Optional.empty(), sv);
+                                Optional.empty(), Optional.empty(), sv, 
directoryLister);
                         break;
                     case ICEBERG:
                         scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                         break;
                     case HIVE:
-                        scanNode = new HiveScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
+                        scanNode = new HiveScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv, directoryLister);
                         ((HiveScanNode) 
scanNode).setTableSample(tblRef.getTableSample());
                         break;
                     default:
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java
new file mode 100644
index 00000000000..3bfdc73b78e
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java
@@ -0,0 +1,708 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import 
org.apache.doris.common.EvictableCacheBuilder.DisabledCacheImplementation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.gaul.modernizer_maven_annotations.SuppressModernizer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+public class TestEvictableCache {
+    private static class TestingTicker extends Ticker {
+        private volatile long time;
+
+        public TestingTicker() {
+        }
+
+        public long read() {
+            return this.time;
+        }
+
+        public synchronized void increment(long delta, TimeUnit unit) {
+            Preconditions.checkArgument(delta >= 0L, "delta is negative");
+            this.time += unit.toNanos(delta);
+        }
+    }
+
+    private static final int TEST_TIMEOUT_SECONDS = 10;
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testLoad()
+            throws Exception {
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10_000)
+                .build();
+        Assert.assertEquals("abc", cache.get(42, () -> "abc"));
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testEvictBySize()
+            throws Exception {
+        int maximumSize = 10;
+        Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(maximumSize)
+                .build();
+
+        for (int i = 0; i < 10_000; i++) {
+            int value = i * 10;
+            Assert.assertEquals(value, (Object) cache.get(i, () -> value));
+        }
+        cache.cleanUp();
+        Assert.assertEquals(maximumSize, cache.size());
+        Assert.assertEquals(maximumSize, ((EvictableCache<?, ?>) 
cache).tokensCount());
+
+        // Ensure cache is effective, i.e. some entries preserved
+        int lastKey = 10_000 - 1;
+        Assert.assertEquals(lastKey * 10, (Object) cache.get(lastKey, () -> {
+            throw new UnsupportedOperationException();
+        }));
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testEvictByWeight() throws Exception {
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .maximumWeight(20)
+                .weigher((Integer key, String value) -> value.length())
+                .build();
+
+        for (int i = 0; i < 10; i++) {
+            String value = String.join("", Collections.nCopies(i, "a"));
+            Assert.assertEquals(value, cache.get(i, () -> value));
+        }
+
+        cache.cleanUp();
+        // It's not deterministic which entries get evicted
+        int cacheSize = Math.toIntExact(cache.size());
+
+        Assert.assertEquals(cacheSize, ((EvictableCache<?, ?>) 
cache).tokensCount());
+        Assert.assertEquals(cacheSize, cache.asMap().keySet().size());
+
+        int keySum = cache.asMap().keySet().stream()
+                .mapToInt(i -> i)
+                .sum();
+        Assert.assertTrue("key sum should be <= 20", keySum <= 20);
+
+        Assert.assertEquals(cacheSize, cache.asMap().values().size());
+
+        int valuesLengthSum = cache.asMap().values().stream()
+                .mapToInt(String::length)
+                .sum();
+        Assert.assertTrue("values length sum should be <= 20", valuesLengthSum 
<= 20);
+
+        // Ensure cache is effective, i.e. some entries preserved
+        int lastKey = 9; // 10 - 1
+        String expected = String.join("", Collections.nCopies(lastKey, "a")); 
// java8 替代 repeat
+        Assert.assertEquals(expected, cache.get(lastKey, () -> {
+            throw new UnsupportedOperationException();
+        }));
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testEvictByTime() throws Exception {
+        TestingTicker ticker = new TestingTicker();
+        int ttl = 100;
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .ticker(ticker)
+                .expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
+                .build();
+
+        Assert.assertEquals("1 ala ma kota", cache.get(1, () -> "1 ala ma 
kota"));
+        ticker.increment(ttl, TimeUnit.MILLISECONDS);
+        Assert.assertEquals("2 ala ma kota", cache.get(2, () -> "2 ala ma 
kota"));
+        cache.cleanUp();
+
+        // First entry should be expired and its token removed
+        int cacheSize = Math.toIntExact(cache.size());
+        Assert.assertEquals(1, cacheSize);
+        Assert.assertEquals(cacheSize, ((EvictableCache<?, ?>) 
cache).tokensCount());
+        Assert.assertEquals(cacheSize, cache.asMap().keySet().size());
+        Assert.assertEquals(cacheSize, cache.asMap().values().size());
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testPreserveValueLoadedAfterTimeExpiration() throws Exception {
+        TestingTicker ticker = new TestingTicker();
+        int ttl = 100;
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .ticker(ticker)
+                .expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
+                .build();
+        int key = 11;
+
+        Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "11 ala ma 
kota"));
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+
+        Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "something 
else"));
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+
+        ticker.increment(ttl, TimeUnit.MILLISECONDS);
+        Assert.assertEquals("new value", cache.get(key, () -> "new value"));
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+
+        Assert.assertEquals("new value", cache.get(key, () -> "something yet 
different"));
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+
+        Assert.assertEquals(1, cache.size());
+        Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount());
+        Assert.assertEquals(1, cache.asMap().keySet().size());
+        Assert.assertEquals(1, cache.asMap().values().size());
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testReplace() throws Exception {
+        Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10)
+                .build();
+
+        int key = 10;
+        int initialValue = 20;
+        int replacedValue = 21;
+
+        cache.get(key, () -> initialValue);
+
+        Assert.assertTrue("Should successfully replace value", 
cache.asMap().replace(key, initialValue, replacedValue));
+        Assert.assertEquals("Cache should contain replaced value", 
replacedValue, cache.getIfPresent(key).intValue());
+
+        Assert.assertFalse("Should not replace when current value is 
different", cache.asMap().replace(key, initialValue, replacedValue));
+        Assert.assertEquals("Cache should maintain replaced value", 
replacedValue, cache.getIfPresent(key).intValue());
+
+        Assert.assertFalse("Should not replace non-existent key", 
cache.asMap().replace(100000, replacedValue, 22));
+        Assert.assertEquals("Cache should only contain original key", 
ImmutableSet.of(key), cache.asMap().keySet());
+        Assert.assertEquals("Original key should maintain its value", 
replacedValue, cache.getIfPresent(key).intValue());
+
+        int anotherKey = 13;
+        int anotherInitialValue = 14;
+        cache.get(anotherKey, () -> anotherInitialValue);
+        cache.invalidate(anotherKey);
+
+        Assert.assertFalse("Should not replace after invalidation", 
cache.asMap().replace(anotherKey, anotherInitialValue, 15));
+        Assert.assertEquals("Cache should only contain original key after 
invalidation", ImmutableSet.of(key), cache.asMap().keySet());
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testDisabledCache() throws Exception {
+        Exception exception = Assert.assertThrows(IllegalStateException.class, 
() ->
+                EvictableCacheBuilder.newBuilder()
+                        .maximumSize(0)
+                        .build());
+
+        Assert.assertEquals("Even when cache is disabled, the loads are 
synchronized and both load results and failures are shared between threads. "
+                        + "This is rarely desired, thus builder caller is 
expected to either opt-in into this behavior with 
shareResultsAndFailuresEvenIfDisabled(), "
+                        + "or choose not to share results (and failures) 
between concurrent invocations with shareNothingWhenDisabled().",
+                exception.getMessage());
+
+        testDisabledCache(
+                EvictableCacheBuilder.newBuilder()
+                        .maximumSize(0)
+                        .shareNothingWhenDisabled()
+                        .build());
+
+        testDisabledCache(
+                EvictableCacheBuilder.newBuilder()
+                        .maximumSize(0)
+                        .shareResultsAndFailuresEvenIfDisabled()
+                        .build());
+    }
+
+    private void testDisabledCache(Cache<Integer, Integer> cache) throws 
Exception {
+        for (int i = 0; i < 10; i++) {
+            int value = i * 10;
+            Assert.assertEquals(value, cache.get(i, () -> value).intValue());
+        }
+
+        cache.cleanUp();
+        Assert.assertEquals(0, cache.size());
+        Assert.assertTrue(cache.asMap().keySet().isEmpty());
+        Assert.assertTrue(cache.asMap().values().isEmpty());
+    }
+
+    private static class CacheStatsAssertions {
+        public static CacheStatsAssertions assertCacheStats(Cache<?, ?> cache) 
{
+            Objects.requireNonNull(cache, "cache is null");
+            return assertCacheStats(cache::stats);
+        }
+
+        public static CacheStatsAssertions 
assertCacheStats(Supplier<CacheStats> statsSupplier) {
+            return new CacheStatsAssertions(statsSupplier);
+        }
+
+        private final Supplier<CacheStats> stats;
+
+        private long loads;
+        private long hits;
+        private long misses;
+
+        private CacheStatsAssertions(Supplier<CacheStats> stats) {
+            this.stats = Objects.requireNonNull(stats, "stats is null");
+        }
+
+        public CacheStatsAssertions loads(long value) {
+            this.loads = value;
+            return this;
+        }
+
+        public CacheStatsAssertions hits(long value) {
+            this.hits = value;
+            return this;
+        }
+
+        public CacheStatsAssertions misses(long value) {
+            this.misses = value;
+            return this;
+        }
+
+        public void afterRunning(Runnable runnable) {
+            try {
+                calling(() -> {
+                    runnable.run();
+                    return null;
+                });
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public <T> T calling(Callable<T> callable)
+                throws Exception {
+            CacheStats beforeStats = stats.get();
+            T value = callable.call();
+            CacheStats afterStats = stats.get();
+
+            long loadDelta = afterStats.loadCount() - beforeStats.loadCount();
+            long missesDelta = afterStats.missCount() - 
beforeStats.missCount();
+            long hitsDelta = afterStats.hitCount() - beforeStats.hitCount();
+
+            Assert.assertEquals(loads, loadDelta);
+            Assert.assertEquals(hits, hitsDelta);
+            Assert.assertEquals(misses, missesDelta);
+
+            return value;
+        }
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testLoadStats()
+            throws Exception {
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10_000)
+                .recordStats()
+                .build();
+
+        Assert.assertEquals(new CacheStats(0, 0, 0, 0, 0, 0), cache.stats());
+
+        String value = CacheStatsAssertions.assertCacheStats(cache)
+                .misses(1)
+                .loads(1)
+                .calling(() -> cache.get(42, () -> "abc"));
+        Assert.assertEquals("abc", value);
+
+        value = CacheStatsAssertions.assertCacheStats(cache)
+                .hits(1)
+                .calling(() -> cache.get(42, () -> "xyz"));
+        Assert.assertEquals("abc", value);
+
+        // with equal, but not the same key
+        value = CacheStatsAssertions.assertCacheStats(cache)
+                .hits(1)
+                .calling(() -> cache.get(newInteger(42), () -> "xyz"));
+        Assert.assertEquals("abc", value);
+    }
+
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testLoadFailure()
+            throws Exception {
+        Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(0)
+                .expireAfterWrite(0, TimeUnit.DAYS)
+                .shareResultsAndFailuresEvenIfDisabled()
+                .build();
+        int key = 10;
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try {
+            Exchanger<Thread> exchanger = new Exchanger<>();
+            CountDownLatch secondUnblocked = new CountDownLatch(1);
+
+            List<Future<String>> futures = new ArrayList<>();
+            for (int i = 0; i < 2; i++) {
+                boolean first = i == 0;
+                futures.add(executor.submit(() -> {
+                    if (!first) {
+                        // Wait for the first one to start the call
+                        exchanger.exchange(Thread.currentThread(), 10, 
TimeUnit.SECONDS);
+                        // Prove that we are back in RUNNABLE state.
+                        secondUnblocked.countDown();
+                    }
+                    return cache.get(key, () -> {
+                        if (first) {
+                            Thread secondThread = exchanger.exchange(null, 10, 
TimeUnit.SECONDS);
+                            Assert.assertTrue(secondUnblocked.await(10, 
TimeUnit.SECONDS));
+                            // Wait for the second one to hang inside the 
cache.get call.
+                            long start = System.nanoTime();
+                            while (!Thread.currentThread().isInterrupted()) {
+                                try {
+                                    
Assert.assertNotEquals(Thread.State.RUNNABLE, secondThread.getState());
+                                    break;
+                                } catch (Exception | AssertionError e) {
+                                    if (System.nanoTime() - start > 
TimeUnit.SECONDS.toNanos(30)) {
+                                        throw e;
+                                    }
+                                }
+                                try {
+                                    Thread.sleep(50);
+                                } catch (InterruptedException e) {
+                                    Thread.currentThread().interrupt();
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                            throw new RuntimeException("first attempt is 
poised to fail");
+                        }
+                        return "success";
+                    });
+                }));
+            }
+
+            List<String> results = new ArrayList<>();
+            for (Future<String> future : futures) {
+                try {
+                    results.add(future.get());
+                } catch (ExecutionException e) {
+                    results.add(e.getCause().toString());
+                }
+            }
+
+            // Note: if this starts to fail, that suggests that Guava 
implementation changed and NoopCache may be redundant now.
+            String expectedError = 
"com.google.common.util.concurrent.UncheckedExecutionException: "
+                    + "java.lang.RuntimeException: first attempt is poised to 
fail";
+            Assert.assertEquals(2, results.size());
+            Assert.assertEquals(expectedError, results.get(0));
+            Assert.assertEquals(expectedError, results.get(1));
+        } finally {
+            executor.shutdownNow();
+            Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+        }
+    }
+
+    @SuppressModernizer
+    private static Integer newInteger(int value) {
+        Integer integer = value;
+        @SuppressWarnings({"UnnecessaryBoxing", "BoxedPrimitiveConstructor", 
"CachedNumberConstructorCall", "removal"})
+        Integer newInteger = new Integer(value);
+        Assert.assertNotSame(integer, newInteger);
+        return newInteger;
+    }
+
+    /**
+     * Test that the loader is invoked only once for concurrent invocations of 
{{@link LoadingCache#get(Object, Callable)} with equal keys.
+     * This is a behavior of Guava Cache as well. While this is necessarily 
desirable behavior (see
+     * <a 
href="https://github.com/trinodb/trino/issues/11067";>https://github.com/trinodb/trino/issues/11067</a>),
+     * the test exists primarily to document current state and support 
discussion, should the current state change.
+     */
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testConcurrentGetWithCallableShareLoad()
+            throws Exception {
+        AtomicInteger loads = new AtomicInteger();
+        AtomicInteger concurrentInvocations = new AtomicInteger();
+
+        Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10_000)
+                .build();
+
+        int threads = 2;
+        int invocationsPerThread = 100;
+        ExecutorService executor = Executors.newFixedThreadPool(threads);
+        try {
+            CyclicBarrier barrier = new CyclicBarrier(threads);
+            List<Future<?>> futures = new ArrayList<>();
+            for (int i = 0; i < threads; i++) {
+                futures.add(executor.submit(() -> {
+                    for (int invocation = 0; invocation < 
invocationsPerThread; invocation++) {
+                        int key = invocation;
+                        barrier.await(10, TimeUnit.SECONDS);
+                        int value = cache.get(key, () -> {
+                            loads.incrementAndGet();
+                            int invocations = 
concurrentInvocations.incrementAndGet();
+                            Preconditions.checkState(invocations == 1, "There 
should be no concurrent invocations, cache should do load sharing when get() 
invoked for same key");
+                            Thread.sleep(1);
+                            concurrentInvocations.decrementAndGet();
+                            return -key;
+                        });
+                        Assert.assertEquals(-invocation, value);
+                    }
+                    return null;
+                }));
+            }
+
+            for (Future<?> future : futures) {
+                future.get(10, TimeUnit.SECONDS);
+            }
+            Assert.assertTrue(
+                    String.format(
+                            "loads (%d) should be between %d and %d",
+                            loads.intValue(),
+                            invocationsPerThread,
+                            threads * invocationsPerThread - 1),
+                    loads.intValue() >= invocationsPerThread && 
loads.intValue() <= threads * invocationsPerThread - 1);
+        } finally {
+            executor.shutdownNow();
+            Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+        }
+    }
+
+    enum Invalidation {
+        INVALIDATE_KEY,
+        INVALIDATE_PREDEFINED_KEYS,
+        INVALIDATE_SELECTED_KEYS,
+        INVALIDATE_ALL,
+        /**/;
+    }
+
+    /**
+     * Covers https://github.com/google/guava/issues/1881
+     */
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testInvalidateOngoingLoad()
+            throws Exception {
+        for (Invalidation invalidation : Invalidation.values()) {
+            Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder()
+                    .maximumSize(10_000)
+                    .build();
+            Integer key = 42;
+
+            CountDownLatch loadOngoing = new CountDownLatch(1);
+            CountDownLatch invalidated = new CountDownLatch(1);
+            CountDownLatch getReturned = new CountDownLatch(1);
+            ExecutorService executor = Executors.newFixedThreadPool(2);
+            try {
+                // thread A
+                Future<String> threadA = executor.submit(() -> {
+                    String value = cache.get(key, () -> {
+                        loadOngoing.countDown(); // 1
+                        Assert.assertTrue(invalidated.await(10, 
TimeUnit.SECONDS)); // 2
+                        return "stale value";
+                    });
+                    getReturned.countDown(); // 3
+                    return value;
+                });
+
+                // thread B
+                Future<String> threadB = executor.submit(() -> {
+                    Assert.assertTrue(loadOngoing.await(10, 
TimeUnit.SECONDS)); // 1
+
+                    switch (invalidation) {
+                        case INVALIDATE_KEY:
+                            cache.invalidate(key);
+                            break;
+                        case INVALIDATE_PREDEFINED_KEYS:
+                            cache.invalidateAll(ImmutableList.of(key));
+                            break;
+                        case INVALIDATE_SELECTED_KEYS:
+                            Set<Integer> keys = cache.asMap().keySet().stream()
+                                    .filter(foundKey -> (int) foundKey == key)
+                                    .collect(ImmutableSet.toImmutableSet());
+                            cache.invalidateAll(keys);
+                            break;
+                        case INVALIDATE_ALL:
+                            cache.invalidateAll();
+                            break;
+                        default:
+                            throw new IllegalArgumentException();
+                    }
+
+                    invalidated.countDown(); // 2
+                    // Cache may persist value after loader returned, but 
before `cache.get(...)` returned. Ensure the latter completed.
+                    Assert.assertTrue(getReturned.await(10, 
TimeUnit.SECONDS)); // 3
+
+                    return cache.get(key, () -> "fresh value");
+                });
+
+                Assert.assertEquals("stale value", threadA.get());
+                Assert.assertEquals("fresh value", threadB.get());
+            } finally {
+                executor.shutdownNow();
+                Assert.assertTrue(executor.awaitTermination(10, 
TimeUnit.SECONDS));
+            }
+        }
+    }
+
+    /**
+     * Covers https://github.com/google/guava/issues/1881
+     */
+    @Test
+    @Timeout(TEST_TIMEOUT_SECONDS)
+    public void testInvalidateAndLoadConcurrently()
+            throws Exception {
+        for (Invalidation invalidation : Invalidation.values()) {
+            int[] primes = {2, 3, 5, 7};
+            AtomicLong remoteState = new AtomicLong(1);
+
+            Cache<Integer, Long> cache = EvictableCacheBuilder.newBuilder()
+                    .maximumSize(10_000)
+                    .build();
+            Integer key = 42;
+            int threads = 4;
+
+            CyclicBarrier barrier = new CyclicBarrier(threads);
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
+            try {
+                List<Future<Void>> futures = IntStream.range(0, threads)
+                        .mapToObj(threadNumber -> executor.submit(() -> {
+                            // prime the cache
+                            Assert.assertEquals(1L, (long) cache.get(key, 
remoteState::get));
+                            int prime = primes[threadNumber];
+
+                            barrier.await(10, TimeUnit.SECONDS);
+
+                            // modify underlying state
+                            remoteState.updateAndGet(current -> current * 
prime);
+
+                            // invalidate
+                            switch (invalidation) {
+                                case INVALIDATE_KEY:
+                                    cache.invalidate(key);
+                                    break;
+                                case INVALIDATE_PREDEFINED_KEYS:
+                                    cache.invalidateAll(ImmutableList.of(key));
+                                    break;
+                                case INVALIDATE_SELECTED_KEYS:
+                                    Set<Integer> keys = 
cache.asMap().keySet().stream()
+                                            .filter(foundKey -> (int) foundKey 
== key)
+                                            
.collect(ImmutableSet.toImmutableSet());
+                                    cache.invalidateAll(keys);
+                                    break;
+                                case INVALIDATE_ALL:
+                                    cache.invalidateAll();
+                                    break;
+                                default:
+                                    throw new IllegalArgumentException();
+                            }
+
+                            // read through cache
+                            long current = cache.get(key, remoteState::get);
+                            if (current % prime != 0) {
+                                throw new AssertionError(String.format("The 
value read through cache (%s) in thread (%s) is not divisible by (%s)", 
current, threadNumber, prime));
+                            }
+
+                            return (Void) null;
+                        }))
+                        .collect(ImmutableList.toImmutableList());
+
+                for (Future<?> future : futures) {
+                    try {
+                        future.get(10, TimeUnit.SECONDS);
+                    } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                        throw new RuntimeException("Failed to get future 
value", e);
+                    }
+                }
+
+                Assert.assertEquals(2 * 3 * 5 * 7, remoteState.get());
+                Assert.assertEquals(remoteState.get(), (long) cache.get(key, 
remoteState::get));
+            } finally {
+                executor.shutdownNow();
+                Assert.assertTrue(executor.awaitTermination(10, 
TimeUnit.SECONDS));
+            }
+        }
+    }
+
+    @Test
+    public void testPutOnEmptyCacheImplementation() {
+        for (DisabledCacheImplementation disabledCacheImplementation : 
DisabledCacheImplementation.values()) {
+            Cache<Object, Object> cache = EvictableCacheBuilder.newBuilder()
+                    .maximumSize(0)
+                    .disabledCacheImplementation(disabledCacheImplementation)
+                    .build();
+            Map<Object, Object> cacheMap = cache.asMap();
+
+            int key = 0;
+            int value = 1;
+            Assert.assertNull(cacheMap.put(key, value));
+            Assert.assertNull(cacheMap.put(key, value));
+            Assert.assertNull(cacheMap.putIfAbsent(key, value));
+            Assert.assertNull(cacheMap.putIfAbsent(key, value));
+        }
+    }
+
+    @Test
+    public void testPutOnNonEmptyCacheImplementation() {
+        Cache<Object, Object> cache = EvictableCacheBuilder.newBuilder()
+                .maximumSize(10)
+                .build();
+        Map<Object, Object> cacheMap = cache.asMap();
+
+        int key = 0;
+        int value = 1;
+
+        Exception putException = Assert.assertThrows("put operation should 
throw UnsupportedOperationException",
+                UnsupportedOperationException.class,
+                () -> cacheMap.put(key, value));
+        Assert.assertEquals(
+                "The operation is not supported, as in inherently races with 
cache invalidation. Use get(key, callable) instead.",
+                putException.getMessage());
+
+        Exception putIfAbsentException = Assert.assertThrows("putIfAbsent 
operation should throw UnsupportedOperationException",
+                UnsupportedOperationException.class,
+                () -> cacheMap.putIfAbsent(key, value));
+        Assert.assertEquals(
+                "The operation is not supported, as in inherently races with 
cache invalidation",
+                putIfAbsentException.getMessage());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java
new file mode 100644
index 00000000000..d6c6ed4b93b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+// some tests may invalidate the whole cache affecting therefore other 
concurrent tests
+@Execution(ExecutionMode.SAME_THREAD)
+public class TransactionScopeCachingDirectoryListerTest {
+    @Test
+    public void testConcurrentDirectoryListing(@Mocked TableIf table)
+            throws FileSystemIOException {
+        RemoteFile firstFile = new RemoteFile("file:/x/x", true, 1, 1);
+        RemoteFile secondFile = new RemoteFile("file:/x/y", true, 1, 1);
+        RemoteFile thirdFile = new RemoteFile("file:/y/z", true, 1, 1);
+
+        String path1 = "file:/x";
+        String path2 = "file:/y";
+
+        CountingDirectoryLister countingLister = new CountingDirectoryLister(
+                ImmutableMap.of(
+                        path1, ImmutableList.of(firstFile, secondFile),
+                        path2, ImmutableList.of(thirdFile)));
+
+        TransactionScopeCachingDirectoryLister cachingLister = 
(TransactionScopeCachingDirectoryLister)
+                new 
TransactionScopeCachingDirectoryListerFactory(2).get(countingLister);
+
+        assertFiles(cachingLister.listFiles(null, true, table, path2), 
ImmutableList.of(thirdFile));
+
+        Assert.assertEquals(1, countingLister.getListCount());
+
+        // listing path2 again shouldn't increase listing count
+        Assert.assertTrue(cachingLister.isCached(path2));
+        assertFiles(cachingLister.listFiles(null, true, table, path2), 
ImmutableList.of(thirdFile));
+        Assert.assertEquals(1, countingLister.getListCount());
+
+
+        // start listing path1 concurrently
+        RemoteIterator<RemoteFile> path1FilesA = cachingLister.listFiles(null, 
true, table, path1);
+        RemoteIterator<RemoteFile> path1FilesB = cachingLister.listFiles(null, 
true, table, path1);
+        Assert.assertEquals(2, countingLister.getListCount());
+
+        // list path1 files using both iterators concurrently
+        Assert.assertEquals(firstFile, path1FilesA.next());
+        Assert.assertEquals(firstFile, path1FilesB.next());
+        Assert.assertEquals(secondFile, path1FilesB.next());
+        Assert.assertEquals(secondFile, path1FilesA.next());
+        Assert.assertFalse(path1FilesA.hasNext());
+        Assert.assertFalse(path1FilesB.hasNext());
+        Assert.assertEquals(2, countingLister.getListCount());
+
+        Assert.assertFalse(cachingLister.isCached(path2));
+        assertFiles(cachingLister.listFiles(null, true, table, path2), 
ImmutableList.of(thirdFile));
+        Assert.assertEquals(3, countingLister.getListCount());
+    }
+
+    @Test
+    public void testConcurrentDirectoryListingException(@Mocked TableIf table)
+            throws FileSystemIOException {
+        RemoteFile file = new RemoteFile("file:/x/x", true, 1, 1);
+
+        String path = "file:/x";
+
+        CountingDirectoryLister countingLister = new 
CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file)));
+        DirectoryLister cachingLister = new 
TransactionScopeCachingDirectoryListerFactory(1).get(countingLister);
+
+        // start listing path concurrently
+        countingLister.setThrowException(true);
+        RemoteIterator<RemoteFile> filesA = cachingLister.listFiles(null, 
true, table, path);
+        RemoteIterator<RemoteFile> filesB = cachingLister.listFiles(null, 
true, table, path);
+        Assert.assertEquals(1, countingLister.getListCount());
+
+        // listing should throw an exception
+        Assert.assertThrows(FileSystemIOException.class, () -> 
filesA.hasNext());
+
+
+        // listing again should succeed
+        countingLister.setThrowException(false);
+        assertFiles(cachingLister.listFiles(null, true, table, path), 
ImmutableList.of(file));
+        Assert.assertEquals(2, countingLister.getListCount());
+
+        // listing using second concurrently initialized DirectoryLister 
should fail
+        Assert.assertThrows(FileSystemIOException.class, () -> 
filesB.hasNext());
+
+    }
+
+    private void assertFiles(RemoteIterator<RemoteFile> iterator, 
List<RemoteFile> expectedFiles)
+            throws FileSystemIOException {
+        ImmutableList.Builder<RemoteFile> actualFiles = 
ImmutableList.builder();
+        while (iterator.hasNext()) {
+            actualFiles.add(iterator.next());
+        }
+        Assert.assertEquals(expectedFiles, actualFiles.build());
+    }
+
+    private static class CountingDirectoryLister
+            implements DirectoryLister {
+        private final Map<String, List<RemoteFile>> fileStatuses;
+        private int listCount;
+        private boolean throwException;
+
+        public CountingDirectoryLister(Map<String, List<RemoteFile>> 
fileStatuses) {
+            this.fileStatuses = Objects.requireNonNull(fileStatuses, 
"fileStatuses is null");
+        }
+
+        @Override
+        public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean 
recursive, TableIf table, String location)
+                throws FileSystemIOException {
+            // No specific recursive files-only listing implementation
+            listCount++;
+            return 
throwingRemoteIterator(Objects.requireNonNull(fileStatuses.get(location)), 
throwException);
+        }
+
+        public void setThrowException(boolean throwException) {
+            this.throwException = throwException;
+        }
+
+        public int getListCount() {
+            return listCount;
+        }
+    }
+
+    static RemoteIterator<RemoteFile> throwingRemoteIterator(List<RemoteFile> 
files, boolean throwException) {
+        return new RemoteIterator<RemoteFile>() {
+            private final Iterator<RemoteFile> iterator = 
ImmutableList.copyOf(files).iterator();
+
+            @Override
+            public boolean hasNext()
+                    throws FileSystemIOException {
+                if (throwException) {
+                    throw new FileSystemIOException("File system io 
exception.");
+                }
+                return iterator.hasNext();
+            }
+
+            @Override
+            public RemoteFile next() {
+                return iterator.next();
+            }
+        };
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to