This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a1f66eb96f7 [fix](fe) Keep cached file systems alive while in use 
(#63677)
a1f66eb96f7 is described below

commit a1f66eb96f78a3ae46c03f8f32fdfce58c17ffd3
Author: Socrates <[email protected]>
AuthorDate: Mon Jun 1 10:19:31 2026 +0800

    [fix](fe) Keep cached file systems alive while in use (#63677)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    Hive file listing and Hive ACID state loading borrow `FileSystem`
    instances from `FileSystemCache` and keep using them while checking
    splitability, listing remote files, or loading ACID state.
    
    The previous cache implementation returned the raw cached `FileSystem`.
    When a cache entry was evicted or expired, the Caffeine removal listener
    closed that same instance immediately. If cache cleanup happened while
    another thread was still using the returned instance, the active Hive
    operation could observe a closed filesystem.
    
    This PR fixes the lifecycle in `FileSystemCache` instead of bypassing
    the cache at Hive call sites. Cached filesystems are now returned
    through leases backed by a holder with an active reference count. Cache
    eviction marks the holder as evicted, and the underlying filesystem is
    closed only after the last active lease is released. If the filesystem
    cache is disabled, the direct lease owns the newly created filesystem
    and closes it when released. Hive file listing and ACID paths now use
    try-with-resources to hold the lease for the whole filesystem usage
    window.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [x] Unit Test
    - `./run-fe-ut.sh --run
    
org.apache.doris.fs.FileSystemCacheTest,org.apache.doris.datasource.hive.HiveMetaStoreCacheTest`
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../datasource/hive/HiveExternalMetaCache.java     |  32 ++--
 .../java/org/apache/doris/fs/FileSystemCache.java  | 167 +++++++++++++++++++--
 .../org/apache/doris/fs/FileSystemCacheTest.java   | 139 +++++++++++++++++
 3 files changed, 308 insertions(+), 30 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java
index f3464deaea1..e1fbc46e8aa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveExternalMetaCache.java
@@ -394,13 +394,13 @@ public class HiveExternalMetaCache extends 
AbstractExternalMetaCache {
 
         FileSystemCache.FileSystemCacheKey fileSystemCacheKey = new 
FileSystemCache.FileSystemCacheKey(
                 path.getFsIdentifier(), path.getStorageProperties());
-        FileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache()
-                .getFileSystem(fileSystemCacheKey);
-        result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, 
path.getNormalizedLocation()));
 
         boolean isRecursiveDirectories = Boolean.valueOf(
                 
catalog.getProperties().getOrDefault("hive.recursive_directories", "true"));
-        try {
+        try (FileSystemCache.FileSystemLease fileSystemLease = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache()
+                .getFileSystem(fileSystemCacheKey)) {
+            FileSystem fs = fileSystemLease.fileSystem();
+            result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, 
path.getNormalizedLocation()));
             RemoteIterator<FileEntry> iterator = directoryLister.listFiles(fs, 
isRecursiveDirectories,
                     table, path.getNormalizedLocation());
             boolean isLzoInputFormat = HiveUtil.isLzoInputFormat(inputFormat);
@@ -824,22 +824,24 @@ public class HiveExternalMetaCache extends 
AbstractExternalMetaCache {
                 HMSExternalCatalog catalog = 
hmsCatalog(partition.getNameMapping().getCtlId());
                 LocationPath locationPath = 
LocationPath.of(partition.getPath(),
                         
catalog.getCatalogProperty().getStoragePropertiesMap());
-                FileSystem fileSystem = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache()
-                        .getFileSystem(new FileSystemCache.FileSystemCacheKey(
-                                locationPath.getNormalizedLocation(),
-                                locationPath.getStorageProperties()));
+                FileSystemCache.FileSystemCacheKey fileSystemCacheKey = new 
FileSystemCache.FileSystemCacheKey(
+                        locationPath.getNormalizedLocation(), 
locationPath.getStorageProperties());
                 AuthenticationConfig authenticationConfig = 
AuthenticationConfig
                         
.getKerberosConfig(locationPath.getStorageProperties().getBackendConfigProperties());
                 HadoopAuthenticator hadoopAuthenticator =
                         
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
 
-                fileCacheValues.add(
-                        hadoopAuthenticator.doAs(() -> AcidUtil.getAcidState(
-                                fileSystem,
-                                partition,
-                                txnValidIds,
-                                
catalog.getCatalogProperty().getStoragePropertiesMap(),
-                                isFullAcid)));
+                try (FileSystemCache.FileSystemLease fileSystemLease =
+                        
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getFileSystem(fileSystemCacheKey))
 {
+                    FileSystem fileSystem = fileSystemLease.fileSystem();
+                    fileCacheValues.add(
+                            hadoopAuthenticator.doAs(() -> 
AcidUtil.getAcidState(
+                                    fileSystem,
+                                    partition,
+                                    txnValidIds,
+                                    
catalog.getCatalogProperty().getStoragePropertiesMap(),
+                                    isFullAcid)));
+                }
             }
         } catch (Exception e) {
             throw new CacheException("Failed to get input splits %s", e, 
txnValidIds.toString());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
index 538d36afcb7..5fc9b109c01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
@@ -23,39 +23,54 @@ import 
org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.filesystem.FileSystem;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Objects;
 import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 
 public class FileSystemCache {
 
     private static final Logger LOG = 
LogManager.getLogger(FileSystemCache.class);
 
-    private final LoadingCache<FileSystemCacheKey, FileSystem> fileSystemCache;
+    private final LoadingCache<FileSystemCacheKey, FileSystemHolder> 
fileSystemCache;
+    private final Function<FileSystemCacheKey, FileSystem> loader;
 
     public FileSystemCache() {
+        this(
+                Config.max_remote_file_system_cache_num,
+                
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+                FileSystemCache::loadFileSystem);
+    }
+
+    @VisibleForTesting
+    FileSystemCache(long maxSize, OptionalLong expireAfterAccessSec, 
Function<FileSystemCacheKey, FileSystem> loader) {
+        this.loader = Objects.requireNonNull(loader, "loader");
+        if (maxSize == 0) {
+            fileSystemCache = null;
+            return;
+        }
         // no need to set refreshAfterWrite, because the FileSystem is created 
once and never changed
         CacheFactory fsCacheFactory = new CacheFactory(
-                
OptionalLong.of(Config.external_cache_expire_time_seconds_after_access),
+                expireAfterAccessSec,
                 OptionalLong.empty(),
-                Config.max_remote_file_system_cache_num,
+                maxSize,
                 false,
                 null);
-        fileSystemCache = 
fsCacheFactory.buildCacheWithSyncRemovalListener(this::loadFileSystem, (key, 
fs, cause) -> {
-            if (fs != null) {
-                try {
-                    fs.close();
-                } catch (IOException e) {
-                    LOG.warn("Failed to close evicted FileSystem for key: {}", 
key, e);
-                }
-            }
-        });
+        fileSystemCache = fsCacheFactory.buildCacheWithSyncRemovalListener(
+                key -> new FileSystemHolder(key, loader.apply(key)), (key, 
holder, cause) -> {
+                    if (holder != null) {
+                        holder.markEvicted();
+                    }
+                });
     }
 
-    private FileSystem loadFileSystem(FileSystemCacheKey key) {
+    private static FileSystem loadFileSystem(FileSystemCacheKey key) {
         try {
             return FileSystemFactory.getFileSystem(key.properties);
         } catch (IOException e) {
@@ -63,8 +78,130 @@ public class FileSystemCache {
         }
     }
 
-    public FileSystem getFileSystem(FileSystemCacheKey key) {
-        return fileSystemCache.get(key);
+    public FileSystemLease getFileSystem(FileSystemCacheKey key) {
+        if (fileSystemCache == null) {
+            return new DirectFileSystemLease(key, loader.apply(key));
+        }
+        while (true) {
+            FileSystemHolder holder = fileSystemCache.get(key);
+            FileSystemLease lease = holder.acquire();
+            if (lease != null) {
+                return lease;
+            }
+            fileSystemCache.asMap().remove(key, holder);
+        }
+    }
+
+    @VisibleForTesting
+    void cleanUp() {
+        if (fileSystemCache != null) {
+            fileSystemCache.cleanUp();
+        }
+    }
+
+    private static final class FileSystemHolder {
+        private final FileSystemCacheKey key;
+        private final FileSystem fileSystem;
+        private int referenceCount = 0;
+        private boolean evicted = false;
+        private boolean closed = false;
+
+        private FileSystemHolder(FileSystemCacheKey key, FileSystem 
fileSystem) {
+            this.key = Objects.requireNonNull(key, "key");
+            this.fileSystem = Objects.requireNonNull(fileSystem, "fileSystem");
+        }
+
+        private synchronized FileSystemLease acquire() {
+            if (evicted || closed) {
+                return null;
+            }
+            referenceCount++;
+            return new CachedFileSystemLease(this);
+        }
+
+        private synchronized void release() {
+            Preconditions.checkState(referenceCount > 0, "FileSystem lease has 
been released more than once");
+            referenceCount--;
+            closeIfIdle();
+        }
+
+        private synchronized void markEvicted() {
+            evicted = true;
+            closeIfIdle();
+        }
+
+        private void closeIfIdle() {
+            if (!evicted || referenceCount != 0 || closed) {
+                return;
+            }
+            closed = true;
+            try {
+                fileSystem.close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close evicted FileSystem for key: {}", 
key, e);
+            }
+        }
+
+        private FileSystem fileSystem() {
+            return fileSystem;
+        }
+    }
+
+    public interface FileSystemLease extends AutoCloseable {
+        FileSystem fileSystem();
+
+        @Override
+        void close();
+    }
+
+    private static final class CachedFileSystemLease implements 
FileSystemLease {
+        private final FileSystemHolder holder;
+        private final AtomicBoolean closed = new AtomicBoolean(false);
+
+        private CachedFileSystemLease(FileSystemHolder holder) {
+            this.holder = holder;
+        }
+
+        @Override
+        public FileSystem fileSystem() {
+            return holder.fileSystem();
+        }
+
+        @Override
+        public void close() {
+            if (!closed.compareAndSet(false, true)) {
+                return;
+            }
+            holder.release();
+        }
+    }
+
+    private static final class DirectFileSystemLease implements 
FileSystemLease {
+        private final FileSystemCacheKey key;
+        private final FileSystem fileSystem;
+        private final AtomicBoolean closed = new AtomicBoolean(false);
+
+        private DirectFileSystemLease(FileSystemCacheKey key, FileSystem 
fileSystem) {
+            this.key = key;
+            this.fileSystem = fileSystem;
+        }
+
+        @Override
+        public FileSystem fileSystem() {
+            return fileSystem;
+        }
+
+        @Override
+        public void close() {
+            if (!closed.compareAndSet(false, true)) {
+                return;
+            }
+            try {
+                fileSystem.close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close uncached FileSystem for key: {}", 
key, e);
+            }
+        }
     }
 
     public static class FileSystemCacheKey {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/fs/FileSystemCacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/fs/FileSystemCacheTest.java
new file mode 100644
index 00000000000..ed8b6a690d1
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/fs/FileSystemCacheTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.OptionalLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class FileSystemCacheTest {
+
+    @Test
+    public void testEvictedFileSystemClosesAfterLastLeaseIsReleased() {
+        CountingFileSystem first = new CountingFileSystem();
+        CountingFileSystem second = new CountingFileSystem();
+        AtomicInteger loadCount = new AtomicInteger();
+        FileSystemCache cache = new FileSystemCache(1L, OptionalLong.empty(),
+                key -> loadCount.getAndIncrement() == 0 ? first : second);
+
+        FileSystemCache.FileSystemCacheKey firstKey = key("hdfs://ns1");
+        FileSystemCache.FileSystemLease firstLease = 
cache.getFileSystem(firstKey);
+        Assert.assertSame(first, firstLease.fileSystem());
+
+        FileSystemCache.FileSystemLease secondLease = 
cache.getFileSystem(key("hdfs://ns2"));
+        cache.cleanUp();
+
+        Assert.assertEquals(0, first.getCloseCount());
+        Assert.assertEquals(0, second.getCloseCount());
+
+        firstLease.close();
+        Assert.assertEquals(1, first.getCloseCount());
+        Assert.assertEquals(0, second.getCloseCount());
+
+        secondLease.close();
+        Assert.assertEquals(0, second.getCloseCount());
+    }
+
+    @Test
+    public void testUncachedFileSystemClosesWhenLeaseIsReleased() {
+        CountingFileSystem fileSystem = new CountingFileSystem();
+        FileSystemCache cache = new FileSystemCache(0L, OptionalLong.empty(), 
key -> fileSystem);
+
+        FileSystemCache.FileSystemLease lease = 
cache.getFileSystem(key("hdfs://ns1"));
+        Assert.assertSame(fileSystem, lease.fileSystem());
+        Assert.assertEquals(0, fileSystem.getCloseCount());
+
+        lease.close();
+        Assert.assertEquals(1, fileSystem.getCloseCount());
+    }
+
+    @Test
+    public void testLeaseCloseIsConcurrentIdempotent() throws 
InterruptedException {
+        CountingFileSystem fileSystem = new CountingFileSystem();
+        FileSystemCache cache = new FileSystemCache(1L, OptionalLong.empty(), 
key -> fileSystem);
+
+        FileSystemCache.FileSystemLease lease = 
cache.getFileSystem(key("hdfs://ns1"));
+        FileSystemCache.FileSystemLease evictingLease = 
cache.getFileSystem(key("hdfs://ns2"));
+        cache.cleanUp();
+
+        closeConcurrently(lease);
+        Assert.assertEquals(1, fileSystem.getCloseCount());
+
+        evictingLease.close();
+    }
+
+    @Test
+    public void testUncachedLeaseCloseIsConcurrentIdempotent() throws 
InterruptedException {
+        CountingFileSystem fileSystem = new CountingFileSystem();
+        FileSystemCache cache = new FileSystemCache(0L, OptionalLong.empty(), 
key -> fileSystem);
+
+        closeConcurrently(cache.getFileSystem(key("hdfs://ns1")));
+
+        Assert.assertEquals(1, fileSystem.getCloseCount());
+    }
+
+    private static void closeConcurrently(FileSystemCache.FileSystemLease 
lease) throws InterruptedException {
+        CountDownLatch ready = new CountDownLatch(2);
+        CountDownLatch start = new CountDownLatch(1);
+        Thread first = closeThread(lease, ready, start);
+        Thread second = closeThread(lease, ready, start);
+        first.start();
+        second.start();
+        ready.await();
+        start.countDown();
+        first.join();
+        second.join();
+    }
+
+    private static Thread closeThread(FileSystemCache.FileSystemLease lease,
+            CountDownLatch ready, CountDownLatch start) {
+        return new Thread(() -> {
+            ready.countDown();
+            try {
+                start.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            lease.close();
+        });
+    }
+
+    private static FileSystemCache.FileSystemCacheKey key(String fsIdent) {
+        return new FileSystemCache.FileSystemCacheKey(fsIdent, 
StorageProperties.createPrimary(
+                Collections.singletonMap(StorageProperties.FS_HDFS_SUPPORT, 
"true")));
+    }
+
+    private static class CountingFileSystem extends MemoryFileSystem {
+        private final AtomicInteger closeCount = new AtomicInteger();
+
+        @Override
+        public void close() {
+            closeCount.incrementAndGet();
+        }
+
+        int getCloseCount() {
+            return closeCount.get();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to