This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new abc535a9e7e branch-2.1: [fix](hudi) Fix Memory Leak in BitCaskDiskMap Due to Circular Reference #48955 (#49115) abc535a9e7e is described below commit abc535a9e7eb93edee104128a2a8fbbfbfb7ba1e Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Mar 21 22:44:35 2025 +0800 branch-2.1: [fix](hudi) Fix Memory Leak in BitCaskDiskMap Due to Circular Reference #48955 (#49115) Cherry-picked from #48955 Co-authored-by: Socrates <suyit...@selectdb.com> --- fe/check/checkstyle/suppressions.xml | 3 + .../hudi/common/util/collection/DiskMap.java | 169 +++++++++++++++++++++ 2 files changed, 172 insertions(+) diff --git a/fe/check/checkstyle/suppressions.xml b/fe/check/checkstyle/suppressions.xml index 436b598b594..bcd034531c6 100644 --- a/fe/check/checkstyle/suppressions.xml +++ b/fe/check/checkstyle/suppressions.xml @@ -65,4 +65,7 @@ under the License. <suppress files="[\\/]com[\\/]amazonaws[\\/]glue[\\/]catalog[\\/]" checks="[a-zA-Z0-9]*"/> <suppress files="[\\/]com[\\/]aliyun[\\/]datalake[\\/]metastore[\\/]hive2[\\/]" checks="[a-zA-Z0-9]*"/> <suppress files="FSDataInputStreamWrapper\.java" checks="[a-zA-Z0-9]*"/> + + <!-- ignore hudi disk map copied from hudi/common/util/collection/DiskMap.java --> + <suppress files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java" checks="[a-zA-Z0-9]*"/> </suppressions> diff --git a/fe/fe-core/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java b/fe/fe-core/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java new file mode 100644 index 00000000000..3069cda9591 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import org.apache.hudi.common.util.FileIOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Stream; + +/* Copied From + * https://github.com/apache/hudi/blob/release-0.15.0/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java + * Doris Modification. + * Use Static cleaner class to avoid circular references in shutdown hooks + */ + +/** + * This interface provides the map interface for storing records in disk after + * they + * spill over from memory. Used by {@link ExternalSpillableMap}. + * + * @param <T> The generic type of the keys + * @param <R> The generic type of the values + */ +public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> { + + private static final Logger LOG = LoggerFactory.getLogger(DiskMap.class); + private static final String SUBFOLDER_PREFIX = "hudi"; + private final File diskMapPathFile; + private transient Thread shutdownThread = null; + + // Base path for the write file + protected final String diskMapPath; + + public DiskMap(String basePath, String prefix) throws IOException { + this.diskMapPath = String.format("%s/%s-%s-%s", basePath, SUBFOLDER_PREFIX, prefix, UUID.randomUUID().toString()); + diskMapPathFile = new File(diskMapPath); + FileIOUtils.deleteDirectory(diskMapPathFile); + FileIOUtils.mkdir(diskMapPathFile); + // Make sure the folder is deleted when JVM exits + diskMapPathFile.deleteOnExit(); + addShutDownHook(); + } + + /** + * Register shutdown hook to force flush contents of the data written to + * FileOutputStream from OS page cache + * (typically 4 KB) to disk. + */ + private void addShutDownHook() { + // Register this disk map path with the static cleaner instead of using an + // instance-specific hook + DiskMapCleaner.registerForCleanup(diskMapPath); + } + + /** + * @returns a stream of the values stored in the disk. + */ + abstract Stream<R> valueStream(); + + /** + * Number of bytes spilled to disk. + */ + abstract long sizeOfFileOnDiskInBytes(); + + /** + * Close and cleanup the Map. + */ + public void close() { + cleanup(false); + } + + /** + * Cleanup all resources, files and folders + * triggered by shutdownhook. + */ + private void cleanup() { + cleanup(true); + } + + /** + * Cleanup all resources, files and folders. + */ + private void cleanup(boolean isTriggeredFromShutdownHook) { + // Reuse the static cleaner method to clean the directory + DiskMapCleaner.cleanupDirectory(diskMapPath); + + // Deregister from the static cleaner + if (!isTriggeredFromShutdownHook) { + DiskMapCleaner.deregisterFromCleanup(diskMapPath); + } + } + + /** + * Static cleaner class to avoid circular references in shutdown hooks + */ + private static class DiskMapCleaner { + private static final Logger CLEANER_LOG = LoggerFactory.getLogger(DiskMapCleaner.class); + private static final Set<String> PATHS_TO_CLEAN = Collections.synchronizedSet(new HashSet<>()); + private static final Thread SHUTDOWN_HOOK; + + static { + // Register a single JVM-wide shutdown hook that handles all paths + SHUTDOWN_HOOK = new Thread(() -> { + synchronized (PATHS_TO_CLEAN) { + PATHS_TO_CLEAN.forEach(DiskMapCleaner::cleanupDirectory); + PATHS_TO_CLEAN.clear(); + } + }); + Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK); + } + + /** + * Register a path to be cleaned up when JVM exits + * + * @param directoryPath Path to register for cleanup + */ + public static void registerForCleanup(String directoryPath) { + PATHS_TO_CLEAN.add(directoryPath); + } + + /** + * Deregister a path from cleanup when it's manually cleaned + * + * @param directoryPath Path to deregister from cleanup + */ + public static void deregisterFromCleanup(String directoryPath) { + PATHS_TO_CLEAN.remove(directoryPath); + } + + /** + * Static cleanup method that doesn't hold references to DiskMap instances + * + * @param directoryPath Path to the directory that needs to be cleaned up + */ + public static void cleanupDirectory(String directoryPath) { + try { + FileIOUtils.deleteDirectory(new File(directoryPath)); + } catch (IOException exception) { + CLEANER_LOG.warn("Error while deleting the disk map directory=" + directoryPath, exception); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org