This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7347cbc6352 [Fix](hdfs-fs)The cache expiration should explicitly 
release the held fs #38610 (#40504)
7347cbc6352 is described below

commit 7347cbc63523300a2573b1fbd54b81382ec1e2c4
Author: Calvin Kirs <k...@apache.org>
AuthorDate: Sat Sep 7 23:45:24 2024 +0800

    [Fix](hdfs-fs)The cache expiration should explicitly release the held fs 
#38610 (#40504)
---
 .../doris/fs/remote/RemoteFSPhantomManager.java    | 117 +++++++++++++++++++++
 .../apache/doris/fs/remote/RemoteFileSystem.java   |  21 +++-
 .../remote/RemoteFileSystemPhantomReference.java   |  44 ++++++++
 .../org/apache/doris/fs/remote/S3FileSystem.java   |  26 +++--
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  50 +++++----
 5 files changed, 231 insertions(+), 27 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
new file mode 100644
index 00000000000..282361c4cb6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs.remote;
+
+import org.apache.doris.common.CustomThreadFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The RemoteFSPhantomManager class is responsible for managing the phantom 
references
+ * of RemoteFileSystem objects. It ensures that the associated FileSystem 
resources are
+ * automatically cleaned up when the RemoteFileSystem objects are garbage 
collected.
+ * <p>
+ * By utilizing a ReferenceQueue and PhantomReference, this class can monitor 
the lifecycle
+ * of RemoteFileSystem objects. When a RemoteFileSystem object is no longer in 
use and is
+ * garbage collected, its corresponding FileSystem resource is properly closed 
to prevent
+ * resource leaks.
+ * <p>
+ * The class provides a thread-safe mechanism to ensure that the cleanup 
thread is started only once.
+ * <p>
+ * Main functionalities include:
+ * - Registering phantom references of RemoteFileSystem objects.
+ * - Starting a periodic cleanup thread that automatically closes unused 
FileSystem resources.
+ */
+public class RemoteFSPhantomManager {
+
+    private static final Logger LOG = 
LogManager.getLogger(RemoteFSPhantomManager.class);
+
+    // Scheduled executor for periodic resource cleanup
+    private static ScheduledExecutorService cleanupExecutor;
+
+    // Reference queue for monitoring RemoteFileSystem objects' phantom 
references
+    private static final ReferenceQueue<RemoteFileSystem> referenceQueue = new 
ReferenceQueue<>();
+
+    // Map storing the phantom references and their corresponding FileSystem 
objects
+    private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, 
FileSystem> referenceMap
+            = new ConcurrentHashMap<>();
+
+    // Flag indicating whether the cleanup thread has been started
+    private static final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    /**
+     * Registers a phantom reference for a RemoteFileSystem object in the 
manager.
+     * If the cleanup thread has not been started, it will be started.
+     *
+     * @param remoteFileSystem the RemoteFileSystem object to be registered
+     */
+    public static void registerPhantomReference(RemoteFileSystem 
remoteFileSystem) {
+        if (!isStarted.get()) {
+            start();
+            isStarted.set(true);
+        }
+        RemoteFileSystemPhantomReference phantomReference = new 
RemoteFileSystemPhantomReference(remoteFileSystem,
+                referenceQueue);
+        referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
+    }
+
+    /**
+     * Starts the cleanup thread, which periodically checks and cleans up 
unused FileSystem resources.
+     * The method uses double-checked locking to ensure thread-safe startup of 
the cleanup thread.
+     */
+    public static void start() {
+        if (isStarted.compareAndSet(false, true)) {
+            synchronized (RemoteFSPhantomManager.class) {
+                LOG.info("Starting cleanup thread for RemoteFileSystem 
objects");
+                if (cleanupExecutor == null) {
+                    CustomThreadFactory threadFactory = new 
CustomThreadFactory("remote-fs-phantom-cleanup");
+                    cleanupExecutor = Executors.newScheduledThreadPool(1, 
threadFactory);
+                    cleanupExecutor.scheduleAtFixedRate(() -> {
+                        Reference<? extends RemoteFileSystem> ref;
+                        while ((ref = referenceQueue.poll()) != null) {
+                            RemoteFileSystemPhantomReference phantomRef = 
(RemoteFileSystemPhantomReference) ref;
+
+                            FileSystem fs = referenceMap.remove(phantomRef);
+                            if (fs != null) {
+                                try {
+                                    fs.close();
+                                    LOG.info("Closed file system: {}", 
fs.getUri());
+                                } catch (IOException e) {
+                                    LOG.warn("Failed to close file system", e);
+                                }
+                            }
+                        }
+                    }, 0, 1, TimeUnit.MINUTES);
+                }
+            }
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
index ffe63f20ac7..2149cdb4e1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
@@ -26,13 +26,18 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
-public abstract class RemoteFileSystem extends PersistentFileSystem {
+public abstract class RemoteFileSystem extends PersistentFileSystem implements 
Closeable {
     // this field will be visited by multi-threads, better use volatile 
qualifier
     protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
+    private final ReentrantLock fsLock = new ReentrantLock();
+    protected static final AtomicBoolean closed = new AtomicBoolean(false);
 
     public RemoteFileSystem(String name, StorageBackend.StorageType type) {
         super(name, type);
@@ -65,4 +70,18 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem {
         }
         return new RemoteFiles(locations);
     }
+
+    @Override
+    public void close() throws IOException {
+        fsLock.lock();
+        try {
+            if (!closed.getAndSet(true)) {
+                if (dfsFileSystem != null) {
+                    dfsFileSystem.close();
+                }
+            }
+        } finally {
+            fsLock.unlock();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java
new file mode 100644
index 00000000000..89506c7b212
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs.remote;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+
+public class RemoteFileSystemPhantomReference extends 
PhantomReference<RemoteFileSystem> {
+
+    private FileSystem fs;
+
+    /**
+     * Creates a new phantom reference that refers to the given object and
+     * is registered with the given queue.
+     *
+     * <p> It is possible to create a phantom reference with a {@code null}
+     * queue.  Such a reference will never be enqueued.
+     *
+     * @param referent the object the new phantom reference will refer to
+     * @param q        the queue with which the reference is to be registered,
+     *                 or {@code null} if registration is not required
+     */
+    public RemoteFileSystemPhantomReference(RemoteFileSystem referent, 
ReferenceQueue<? super RemoteFileSystem> q) {
+        super(referent, q);
+        this.fs = referent.dfsFileSystem;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index f91c50d7099..7d4b9d797ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -57,14 +57,26 @@ public class S3FileSystem extends ObjFileSystem {
 
     @Override
     protected FileSystem nativeFileSystem(String remotePath) throws 
UserException {
+        //todo Extracting a common method to achieve logic reuse
+        if (closed.get()) {
+            throw new UserException("FileSystem is closed.");
+        }
         if (dfsFileSystem == null) {
-            Configuration conf = new Configuration();
-            System.setProperty("com.amazonaws.services.s3.enableV4", "true");
-            
PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set);
-            try {
-                dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), 
conf);
-            } catch (Exception e) {
-                throw new UserException("Failed to get S3 FileSystem for " + 
e.getMessage(), e);
+            synchronized (this) {
+                if (closed.get()) {
+                    throw new UserException("FileSystem is closed.");
+                }
+                if (dfsFileSystem == null) {
+                    Configuration conf = new Configuration();
+                    System.setProperty("com.amazonaws.services.s3.enableV4", 
"true");
+                    
PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set);
+                    try {
+                        dfsFileSystem = FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                    } catch (Exception e) {
+                        throw new UserException("Failed to get S3 FileSystem 
for " + e.getMessage(), e);
+                    }
+                    RemoteFSPhantomManager.registerPhantomReference(this);
+                }
             }
         }
         return dfsFileSystem;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 25ecafda468..ee7fddf7ac6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.util.URI;
 import org.apache.doris.fs.operations.HDFSFileOperations;
 import org.apache.doris.fs.operations.HDFSOpParams;
 import org.apache.doris.fs.operations.OpParams;
+import org.apache.doris.fs.remote.RemoteFSPhantomManager;
 import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 
@@ -73,30 +74,41 @@ public class DFSFileSystem extends RemoteFileSystem {
 
     @Override
     protected FileSystem nativeFileSystem(String remotePath) throws 
UserException {
-        if (dfsFileSystem != null) {
-            return dfsFileSystem;
+        if (closed.get()) {
+            throw new UserException("FileSystem is closed.");
         }
 
-        Configuration conf = new HdfsConfiguration();
-        for (Map.Entry<String, String> propEntry : properties.entrySet()) {
-            conf.set(propEntry.getKey(), propEntry.getValue());
-        }
+        if (dfsFileSystem == null) {
+            synchronized (this) {
+                if (closed.get()) {
+                    throw new UserException("FileSystem is closed.");
+                }
+                if (dfsFileSystem == null) {
 
-        UserGroupInformation ugi = login(conf);
-        try {
-            dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
-                try {
-                    return FileSystem.get(new Path(remotePath).toUri(), conf);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
+                    Configuration conf = new HdfsConfiguration();
+                    for (Map.Entry<String, String> propEntry : 
properties.entrySet()) {
+                        conf.set(propEntry.getKey(), propEntry.getValue());
+                    }
+
+                    UserGroupInformation ugi = login(conf);
+                    try {
+                        dfsFileSystem = 
ugi.doAs((PrivilegedAction<FileSystem>) () -> {
+                            try {
+                                return FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+                    } catch (SecurityException e) {
+                        throw new UserException(e);
+                    }
+
+                    Preconditions.checkNotNull(dfsFileSystem);
+                    operations = new HDFSFileOperations(dfsFileSystem);
+                    RemoteFSPhantomManager.registerPhantomReference(this);
                 }
-            });
-        } catch (SecurityException e) {
-            throw new UserException(e);
+            }
         }
-
-        Preconditions.checkNotNull(dfsFileSystem);
-        operations = new HDFSFileOperations(dfsFileSystem);
         return dfsFileSystem;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to