This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new abc535a9e7e branch-2.1: [fix](hudi) Fix Memory Leak in BitCaskDiskMap 
Due to Circular Reference #48955 (#49115)
abc535a9e7e is described below

commit abc535a9e7eb93edee104128a2a8fbbfbfb7ba1e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Mar 21 22:44:35 2025 +0800

    branch-2.1: [fix](hudi) Fix Memory Leak in BitCaskDiskMap Due to Circular 
Reference #48955 (#49115)
    
    Cherry-picked from #48955
    
    Co-authored-by: Socrates <suyit...@selectdb.com>
---
 fe/check/checkstyle/suppressions.xml               |   3 +
 .../hudi/common/util/collection/DiskMap.java       | 169 +++++++++++++++++++++
 2 files changed, 172 insertions(+)

diff --git a/fe/check/checkstyle/suppressions.xml 
b/fe/check/checkstyle/suppressions.xml
index 436b598b594..bcd034531c6 100644
--- a/fe/check/checkstyle/suppressions.xml
+++ b/fe/check/checkstyle/suppressions.xml
@@ -65,4 +65,7 @@ under the License.
     <suppress files="[\\/]com[\\/]amazonaws[\\/]glue[\\/]catalog[\\/]" 
checks="[a-zA-Z0-9]*"/>
     <suppress 
files="[\\/]com[\\/]aliyun[\\/]datalake[\\/]metastore[\\/]hive2[\\/]" 
checks="[a-zA-Z0-9]*"/>
     <suppress files="FSDataInputStreamWrapper\.java" checks="[a-zA-Z0-9]*"/>
+
+    <!-- ignore hudi disk map copied from 
hudi/common/util/collection/DiskMap.java -->
+    <suppress 
files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java"
 checks="[a-zA-Z0-9]*"/>
 </suppressions>
diff --git 
a/fe/fe-core/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java 
b/fe/fe-core/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
new file mode 100644
index 00000000000..3069cda9591
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.common.util.FileIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+/* Copied From
+ * 
https://github.com/apache/hudi/blob/release-0.15.0/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
+ * Doris Modification.
+ * Use Static cleaner class to avoid circular references in shutdown hooks
+ */
+
+/**
+ * This interface provides the map interface for storing records in disk after
+ * they
+ * spill over from memory. Used by {@link ExternalSpillableMap}.
+ *
+ * @param <T> The generic type of the keys
+ * @param <R> The generic type of the values
+ */
+public abstract class DiskMap<T extends Serializable, R extends Serializable> 
implements Map<T, R>, Iterable<R> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DiskMap.class);
+  private static final String SUBFOLDER_PREFIX = "hudi";
+  private final File diskMapPathFile;
+  private transient Thread shutdownThread = null;
+
+  // Base path for the write file
+  protected final String diskMapPath;
+
+  public DiskMap(String basePath, String prefix) throws IOException {
+    this.diskMapPath = String.format("%s/%s-%s-%s", basePath, 
SUBFOLDER_PREFIX, prefix, UUID.randomUUID().toString());
+    diskMapPathFile = new File(diskMapPath);
+    FileIOUtils.deleteDirectory(diskMapPathFile);
+    FileIOUtils.mkdir(diskMapPathFile);
+    // Make sure the folder is deleted when JVM exits
+    diskMapPathFile.deleteOnExit();
+    addShutDownHook();
+  }
+
+  /**
+   * Register shutdown hook to force flush contents of the data written to
+   * FileOutputStream from OS page cache
+   * (typically 4 KB) to disk.
+   */
+  private void addShutDownHook() {
+    // Register this disk map path with the static cleaner instead of using an
+    // instance-specific hook
+    DiskMapCleaner.registerForCleanup(diskMapPath);
+  }
+
+  /**
+   * @returns a stream of the values stored in the disk.
+   */
+  abstract Stream<R> valueStream();
+
+  /**
+   * Number of bytes spilled to disk.
+   */
+  abstract long sizeOfFileOnDiskInBytes();
+
+  /**
+   * Close and cleanup the Map.
+   */
+  public void close() {
+    cleanup(false);
+  }
+
+  /**
+   * Cleanup all resources, files and folders
+   * triggered by shutdownhook.
+   */
+  private void cleanup() {
+    cleanup(true);
+  }
+
+  /**
+   * Cleanup all resources, files and folders.
+   */
+  private void cleanup(boolean isTriggeredFromShutdownHook) {
+    // Reuse the static cleaner method to clean the directory
+    DiskMapCleaner.cleanupDirectory(diskMapPath);
+
+    // Deregister from the static cleaner
+    if (!isTriggeredFromShutdownHook) {
+      DiskMapCleaner.deregisterFromCleanup(diskMapPath);
+    }
+  }
+
+  /**
+   * Static cleaner class to avoid circular references in shutdown hooks
+   */
+  private static class DiskMapCleaner {
+    private static final Logger CLEANER_LOG = 
LoggerFactory.getLogger(DiskMapCleaner.class);
+    private static final Set<String> PATHS_TO_CLEAN = 
Collections.synchronizedSet(new HashSet<>());
+    private static final Thread SHUTDOWN_HOOK;
+
+    static {
+      // Register a single JVM-wide shutdown hook that handles all paths
+      SHUTDOWN_HOOK = new Thread(() -> {
+        synchronized (PATHS_TO_CLEAN) {
+          PATHS_TO_CLEAN.forEach(DiskMapCleaner::cleanupDirectory);
+          PATHS_TO_CLEAN.clear();
+        }
+      });
+      Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
+    }
+
+    /**
+     * Register a path to be cleaned up when JVM exits
+     * 
+     * @param directoryPath Path to register for cleanup
+     */
+    public static void registerForCleanup(String directoryPath) {
+      PATHS_TO_CLEAN.add(directoryPath);
+    }
+
+    /**
+     * Deregister a path from cleanup when it's manually cleaned
+     * 
+     * @param directoryPath Path to deregister from cleanup
+     */
+    public static void deregisterFromCleanup(String directoryPath) {
+      PATHS_TO_CLEAN.remove(directoryPath);
+    }
+
+    /**
+     * Static cleanup method that doesn't hold references to DiskMap instances
+     * 
+     * @param directoryPath Path to the directory that needs to be cleaned up
+     */
+    public static void cleanupDirectory(String directoryPath) {
+      try {
+        FileIOUtils.deleteDirectory(new File(directoryPath));
+      } catch (IOException exception) {
+        CLEANER_LOG.warn("Error while deleting the disk map directory=" + 
directoryPath, exception);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to