This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e3e0590ab8c [Feature](iceberg) Add manifest-level cache for Iceberg 
tables to reduce I/O and parsing overhead (#59056)
e3e0590ab8c is described below

commit e3e0590ab8c2942cfee62c6686859d2e2f77c842
Author: Socrates <[email protected]>
AuthorDate: Fri Dec 26 17:48:36 2025 +0800

    [Feature](iceberg) Add manifest-level cache for Iceberg tables to reduce 
I/O and parsing overhead (#59056)
    
    ### What problem does this PR solve?
    
    ## Motivation
    
    During Iceberg query planning, FE needs to read and parse the metadata
    chain: ManifestList → Manifest → DataFile/DeleteFile. When frequently
    querying hot partitions or executing small batch queries, the same
    Manifest files are repeatedly read and parsed, causing significant I/O
    and CPU overhead.
    
    ## Solution
    
    This PR introduces a manifest-level cache (`IcebergManifestCache`) in FE
    to cache the parsed DataFile/DeleteFile lists per manifest file. The
    cache is implemented using Caffeine with weight-based LRU eviction and
    TTL support.
    
    ### Key Components
    
    - **IcebergManifestCache**: Core cache implementation using Caffeine
    - Weight-based LRU eviction controlled by
    `iceberg.manifest.cache.capacity-mb`
      - TTL expiration via `iceberg.manifest.cache.ttl-second`
    - Single-flight loading to prevent duplicate parsing of the same
    manifest
    
    - **ManifestCacheKey**: Cache key consisting of:
      - Manifest file path
    
    - **ManifestCacheValue**: Cached payload containing:
      - List of `DataFile` or `DeleteFile`
      - Estimated memory weight for eviction
    
    - **IcebergManifestCacheLoader**: Helper class to load and populate the
    cache using `ManifestFiles.read()`
    
    ### Cache Invalidation Strategy
    
    - Key changes automatically invalidate stale entries
    (length/lastModified/sequenceNumber changes)
    - TTL prevents stale data when underlying storage doesn't support
    precise mtime/etag
    - Different snapshots use different manifest paths/keys, ensuring
    snapshot-level isolation
    
    ### Iceberg Catalog Properties
    
    | Config | Default | Description |
    |--------|---------|-------------|
    | `iceberg.manifest.cache.enable` | `true` | Enable/disable manifest
    cache |
    | `iceberg.manifest.cache.capacity-mb` | `1024` | Maximum cache capacity
    in MB |
    | `iceberg.manifest.cache.ttl-second` | `48 * 60 * 60` | Cache entry
    expiration after access |
    
    ### Integration Point
    
    The cache is integrated in
    `IcebergScanNode.planFileScanTaskWithManifestCache()`, which:
    1. Loads delete manifests via cache and builds `DeleteFileIndex`
    2. Loads data manifests via cache and creates `FileScanTask` for each
    data file
    3. Falls back to original scan if cache loading fails
---
 fe/check/checkstyle/suppressions.xml               |   3 +
 .../datasource/iceberg/IcebergExternalCatalog.java |  36 +
 .../datasource/iceberg/IcebergMetadataCache.java   |  16 +
 .../doris/datasource/iceberg/IcebergUtils.java     |  16 +
 .../iceberg/cache/ContentFileEstimator.java        | 194 +++++
 .../iceberg/cache/IcebergManifestCache.java        |  91 +++
 .../iceberg/cache/IcebergManifestCacheLoader.java  | 118 +++
 .../datasource/iceberg/cache/ManifestCacheKey.java |  58 ++
 .../iceberg/cache/ManifestCacheValue.java          |  65 ++
 .../datasource/iceberg/source/IcebergScanNode.java | 196 ++++-
 .../metastore/AbstractIcebergProperties.java       |  62 ++
 .../java/org/apache/iceberg/DeleteFileIndex.java   | 906 +++++++++++++++++++++
 .../iceberg/test_iceberg_manifest_cache.out        |  21 +
 .../iceberg/test_iceberg_manifest_cache.groovy     | 119 +++
 14 files changed, 1893 insertions(+), 8 deletions(-)

diff --git a/fe/check/checkstyle/suppressions.xml 
b/fe/check/checkstyle/suppressions.xml
index 8f000bb7616..7340c4c5bd5 100644
--- a/fe/check/checkstyle/suppressions.xml
+++ b/fe/check/checkstyle/suppressions.xml
@@ -69,6 +69,9 @@ under the License.
     <!-- ignore hudi disk map copied from 
hudi/common/util/collection/DiskMap.java -->
     <suppress 
files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java"
 checks="[a-zA-Z0-9]*"/>
 
+    <!-- ignore iceberg delete file index copied from 
iceberg/DeleteFileIndex.java -->
+    <suppress files="org[\\/]apache[\\/]iceberg[\\/]DeleteFileIndex\.java" 
checks="[a-zA-Z0-9]*"/>
+
     <!-- ignore gensrc/thrift/ExternalTableSchema.thrift -->
     <suppress files=".*thrift/schema/external/.*" checks=".*"/>
 </suppressions>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 57d80c804c0..8d08e3e8eae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -51,6 +51,12 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
     public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = 
"iceberg.table.meta.cache.ttl-second";
     public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND = 
"iceberg.snapshot.meta.cache.ttl-second";
+    public static final String ICEBERG_MANIFEST_CACHE_ENABLE = 
"iceberg.manifest.cache.enable";
+    public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 
"iceberg.manifest.cache.capacity-mb";
+    public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = 
"iceberg.manifest.cache.ttl-second";
+    public static final boolean DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE = true;
+    public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024;
+    public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 * 
60 * 60;
     protected String icebergCatalogType;
     protected Catalog catalog;
 
@@ -95,6 +101,29 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
                     "The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND 
+ " is wrong, value is "
                     + partitionCacheTtlSecond);
         }
+
+        String manifestCacheEnable = 
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
+        if (Objects.nonNull(manifestCacheEnable)
+                && !(manifestCacheEnable.equalsIgnoreCase("true") || 
manifestCacheEnable.equalsIgnoreCase("false"))) {
+            throw new DdlException(
+                    "The parameter " + ICEBERG_MANIFEST_CACHE_ENABLE + " is 
wrong, value is "
+                    + manifestCacheEnable);
+        }
+
+        String manifestCacheCapacity = 
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
+        if (Objects.nonNull(manifestCacheCapacity) && 
NumberUtils.toLong(manifestCacheCapacity, -1) <= 0) {
+            throw new DdlException(
+                    "The parameter " + ICEBERG_MANIFEST_CACHE_CAPACITY_MB + " 
is wrong, value is "
+                    + manifestCacheCapacity);
+        }
+
+        String manifestCacheTtlSecond = 
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
+        if (Objects.nonNull(manifestCacheTtlSecond)
+                && NumberUtils.toLong(manifestCacheTtlSecond, CACHE_NO_TTL) < 
CACHE_TTL_DISABLE_CACHE) {
+            throw new DdlException(
+                    "The parameter " + ICEBERG_MANIFEST_CACHE_TTL_SECOND + " 
is wrong, value is "
+                    + manifestCacheTtlSecond);
+        }
         
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
     }
 
@@ -106,6 +135,13 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         if (Objects.nonNull(tableMetaCacheTtl) || 
Objects.nonNull(snapshotMetaCacheTtl)) {
             
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
         }
+        String manifestCacheEnable = 
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
+        String manifestCacheCapacity = 
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
+        String manifestCacheTtl = 
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
+        if (Objects.nonNull(manifestCacheEnable) || 
Objects.nonNull(manifestCacheCapacity)
+                || Objects.nonNull(manifestCacheTtl)) {
+            
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index 0d49965d44a..7acc22152c6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.NameMapping;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -58,6 +59,7 @@ public class IcebergMetadataCache {
     private LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
     private LoadingCache<IcebergMetadataCacheKey, IcebergSnapshotCacheValue> 
snapshotCache;
     private LoadingCache<IcebergMetadataCacheKey, View> viewCache;
+    private IcebergManifestCache manifestCache;
 
     public IcebergMetadataCache(IcebergExternalCatalog catalog, 
ExecutorService executor) {
         this.executor = executor;
@@ -101,6 +103,15 @@ public class IcebergMetadataCache {
                 null);
         this.snapshotCache = 
snapshotCacheFactory.buildCache(this::loadSnapshot, executor);
         this.viewCache = tableCacheFactory.buildCache(this::loadView, 
executor);
+
+        long manifestCacheCapacityMb = NumberUtils.toLong(
+                
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY_MB),
+                
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB);
+        manifestCacheCapacityMb = Math.max(manifestCacheCapacityMb, 0L);
+        long manifestCacheTtlSec = NumberUtils.toLong(
+                
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND),
+                
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND);
+        this.manifestCache = new IcebergManifestCache(manifestCacheCapacityMb, 
manifestCacheTtlSec);
     }
 
     public Table getIcebergTable(ExternalTable dorisTable) {
@@ -117,6 +128,10 @@ public class IcebergMetadataCache {
         return snapshotCache.get(key);
     }
 
+    public IcebergManifestCache getManifestCache() {
+        return manifestCache;
+    }
+
     @NotNull
     private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
         Table icebergTable = getIcebergTable(key);
@@ -200,6 +215,7 @@ public class IcebergMetadataCache {
         viewCache.asMap().entrySet().stream()
                 .filter(entry -> entry.getKey().nameMapping.getCtlId() == 
catalogId)
                 .forEach(entry -> viewCache.invalidate(entry.getKey()));
+        manifestCache.invalidateAll();
     }
 
     public void invalidateTableCache(ExternalTable dorisTable) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 001a9a85903..2ee56f0a0f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -56,6 +56,7 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
 import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
 import org.apache.doris.datasource.mvcc.MvccUtil;
@@ -1553,4 +1554,19 @@ public class IcebergUtils {
                 icebergExternalTable.getViewText();
     }
 
+    public static IcebergManifestCache getManifestCache(ExternalCatalog 
catalog) {
+        return Env.getCurrentEnv()
+                .getExtMetaCacheMgr()
+                .getIcebergMetadataCache((IcebergExternalCatalog) catalog)
+                .getManifestCache();
+    }
+
+    public static boolean isManifestCacheEnabled(ExternalCatalog catalog) {
+        String enabled = 
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE);
+        if (enabled == null) {
+            return 
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE;
+        }
+        return Boolean.parseBoolean(enabled);
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
new file mode 100644
index 00000000000..112f161389b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.StructLike;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
+ */
+public final class ContentFileEstimator {
+    private static final long LIST_BASE_WEIGHT = 48L;
+    private static final long OBJECT_REFERENCE_WEIGHT = 8L;
+    private static final long CONTENT_FILE_BASE_WEIGHT = 256L;
+    private static final long STRING_BASE_WEIGHT = 40L;
+    private static final long CHAR_BYTES = 2L;
+    private static final long BYTE_BUFFER_BASE_WEIGHT = 16L;
+    private static final long MAP_BASE_WEIGHT = 48L;
+    private static final long MAP_ENTRY_OVERHEAD = 24L;
+    private static final long LONG_OBJECT_WEIGHT = 24L;
+    private static final long INT_OBJECT_WEIGHT = 16L;
+    private static final long PARTITION_BASE_WEIGHT = 48L;
+    private static final long PARTITION_VALUE_BASE_WEIGHT = 8L;
+
+    private ContentFileEstimator() {
+    }
+
+    public static long estimate(List<? extends ContentFile<?>> files) {
+        return listReferenceWeight(files) + estimateContentFilesWeight(files);
+    }
+
+    private static long listReferenceWeight(List<?> files) {
+        if (files == null || files.isEmpty()) {
+            return 0L;
+        }
+        return LIST_BASE_WEIGHT + (long) files.size() * 
OBJECT_REFERENCE_WEIGHT;
+    }
+
+    private static long estimateContentFilesWeight(List<? extends 
ContentFile<?>> files) {
+        long total = 0L;
+        if (files == null) {
+            return 0L;
+        }
+        for (ContentFile<?> file : files) {
+            total += estimateContentFileWeight(file);
+        }
+        return total;
+    }
+
+    private static long estimateContentFileWeight(ContentFile<?> file) {
+        if (file == null) {
+            return 0L;
+        }
+
+        long weight = CONTENT_FILE_BASE_WEIGHT;
+        weight += charSequenceWeight(file.path());
+        weight += stringWeight(file.manifestLocation());
+        weight += byteBufferWeight(file.keyMetadata());
+        weight += partitionWeight(file.partition());
+
+        weight += numericMapWeight(file.columnSizes());
+        weight += numericMapWeight(file.valueCounts());
+        weight += numericMapWeight(file.nullValueCounts());
+        weight += numericMapWeight(file.nanValueCounts());
+        weight += byteBufferMapWeight(file.lowerBounds());
+        weight += byteBufferMapWeight(file.upperBounds());
+
+        weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT);
+        weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT);
+
+        weight += optionalLongWeight(file.pos());
+        weight += optionalLongWeight(file.dataSequenceNumber());
+        weight += optionalLongWeight(file.fileSequenceNumber());
+        weight += optionalLongWeight(file.firstRowId());
+        weight += optionalIntWeight(file.sortOrderId());
+
+        if (file instanceof DeleteFile) {
+            DeleteFile deleteFile = (DeleteFile) file;
+            weight += stringWeight(deleteFile.referencedDataFile());
+            weight += optionalLongWeight(deleteFile.contentOffset());
+            weight += optionalLongWeight(deleteFile.contentSizeInBytes());
+        }
+
+        return weight;
+    }
+
+    private static long listWeight(List<? extends Number> list, long 
elementWeight) {
+        if (list == null || list.isEmpty()) {
+            return 0L;
+        }
+        return LIST_BASE_WEIGHT + (long) list.size() * 
(OBJECT_REFERENCE_WEIGHT + elementWeight);
+    }
+
+    private static long numericMapWeight(Map<Integer, Long> map) {
+        if (map == null || map.isEmpty()) {
+            return 0L;
+        }
+        return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD + 
LONG_OBJECT_WEIGHT);
+    }
+
+    private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) {
+        if (map == null || map.isEmpty()) {
+            return 0L;
+        }
+        long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD;
+        for (ByteBuffer buffer : map.values()) {
+            weight += byteBufferWeight(buffer);
+        }
+        return weight;
+    }
+
+    private static long partitionWeight(StructLike partition) {
+        if (partition == null) {
+            return 0L;
+        }
+        long weight = PARTITION_BASE_WEIGHT + (long) partition.size() * 
PARTITION_VALUE_BASE_WEIGHT;
+        for (int i = 0; i < partition.size(); i++) {
+            Object value = partition.get(i, Object.class);
+            weight += estimateValueWeight(value);
+        }
+        return weight;
+    }
+
+    private static long estimateValueWeight(Object value) {
+        if (value == null) {
+            return 0L;
+        }
+        if (value instanceof CharSequence) {
+            return charSequenceWeight((CharSequence) value);
+        } else if (value instanceof byte[]) {
+            return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length;
+        } else if (value instanceof ByteBuffer) {
+            return byteBufferWeight((ByteBuffer) value);
+        } else if (value instanceof Long || value instanceof Double) {
+            return LONG_OBJECT_WEIGHT;
+        } else if (value instanceof Integer || value instanceof Float) {
+            return INT_OBJECT_WEIGHT;
+        } else if (value instanceof Short || value instanceof Character) {
+            return 4L;
+        } else if (value instanceof Boolean) {
+            return 1L;
+        }
+        return OBJECT_REFERENCE_WEIGHT;
+    }
+
+    private static long charSequenceWeight(CharSequence value) {
+        if (value == null) {
+            return 0L;
+        }
+        return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
+    }
+
+    private static long stringWeight(String value) {
+        if (value == null) {
+            return 0L;
+        }
+        return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
+    }
+
+    private static long byteBufferWeight(ByteBuffer buffer) {
+        if (buffer == null) {
+            return 0L;
+        }
+        return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining();
+    }
+
+    private static long optionalLongWeight(Long value) {
+        return value == null ? 0L : LONG_OBJECT_WEIGHT;
+    }
+
+    private static long optionalIntWeight(Integer value) {
+        return value == null ? 0L : INT_OBJECT_WEIGHT;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
new file mode 100644
index 00000000000..6c5d79ecb69
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.doris.datasource.CacheException;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+/**
+ * A lightweight manifest cache that stores parsed DataFile/DeleteFile lists 
per manifest.
+ */
+public class IcebergManifestCache {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergManifestCache.class);
+
+    private final LoadingCache<ManifestCacheKey, ManifestCacheValue> cache;
+
+    public IcebergManifestCache(long capacityMb, long ttlSec) {
+        long capacityInBytes = capacityMb * 1024L * 1024L;
+        Weigher<ManifestCacheKey, ManifestCacheValue> weigher = (key, value) 
-> {
+            long weight = 
Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L);
+            if (weight > Integer.MAX_VALUE) {
+                return Integer.MAX_VALUE;
+            }
+            return (int) weight;
+        };
+        Caffeine<ManifestCacheKey, ManifestCacheValue> builder = 
Caffeine.newBuilder()
+                .maximumWeight(capacityInBytes)
+                .weigher(weigher)
+                .expireAfterAccess(Duration.ofSeconds(ttlSec));
+        cache = builder.build(new CacheLoader<ManifestCacheKey, 
ManifestCacheValue>() {
+            @Override
+            public ManifestCacheValue load(ManifestCacheKey key) {
+                throw new CacheException("Manifest cache loader should be 
provided explicitly for key %s", null, key);
+            }
+        });
+    }
+
+    public ManifestCacheValue get(ManifestCacheKey key, 
Callable<ManifestCacheValue> loader) {
+        try {
+            return cache.get(key, ignored -> {
+                try {
+                    return loader.call();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } catch (Exception e) {
+            throw new CacheException("Failed to load manifest cache for key 
%s", e, key);
+        }
+    }
+
+    public Optional<ManifestCacheValue> peek(ManifestCacheKey key) {
+        return Optional.ofNullable(cache.getIfPresent(key));
+    }
+
+    public void invalidateByPath(String path) {
+        cache.invalidate(buildKey(path));
+    }
+
+    public void invalidateAll() {
+        cache.invalidateAll();
+    }
+
+    public ManifestCacheKey buildKey(String path) {
+        return new ManifestCacheKey(path);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
new file mode 100644
index 00000000000..8ec14a50e6a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.doris.datasource.CacheException;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Helper to load manifest content and populate the manifest cache.
+ */
+public class IcebergManifestCacheLoader {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergManifestCacheLoader.class);
+
+    private IcebergManifestCacheLoader() {
+    }
+
+    public static ManifestCacheValue 
loadDataFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+            Table table) {
+        return loadDataFilesWithCache(cache, manifest, table, null);
+    }
+
+    public static ManifestCacheValue 
loadDataFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+            Table table, Consumer<Boolean> cacheHitRecorder) {
+        return loadWithCache(cache, manifest, cacheHitRecorder, () -> 
loadDataFiles(manifest, table));
+    }
+
+    public static ManifestCacheValue 
loadDeleteFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+            Table table) {
+        return loadDeleteFilesWithCache(cache, manifest, table, null);
+    }
+
+    public static ManifestCacheValue 
loadDeleteFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+            Table table, Consumer<Boolean> cacheHitRecorder) {
+        return loadWithCache(cache, manifest, cacheHitRecorder, () -> 
loadDeleteFiles(manifest, table));
+    }
+
+    private static ManifestCacheValue loadDataFiles(ManifestFile manifest, 
Table table) {
+        List<DataFile> dataFiles = Lists.newArrayList();
+        try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, 
table.io())) {
+            // ManifestReader implements CloseableIterable<DataFile>, iterate 
directly
+            for (DataFile dataFile : reader) {
+                dataFiles.add(dataFile.copy());
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to read data manifest {}", manifest.path(), e);
+            throw new CacheException("Failed to read data manifest %s", e, 
manifest.path());
+        }
+        return ManifestCacheValue.forDataFiles(dataFiles);
+    }
+
+    private static ManifestCacheValue loadDeleteFiles(ManifestFile manifest, 
Table table) {
+        List<DeleteFile> deleteFiles = Lists.newArrayList();
+        try (ManifestReader<DeleteFile> reader = 
ManifestFiles.readDeleteManifest(manifest, table.io(),
+                table.specs())) {
+            // ManifestReader implements CloseableIterable<DeleteFile>, 
iterate directly
+            for (DeleteFile deleteFile : reader) {
+                deleteFiles.add(deleteFile.copy());
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to read delete manifest {}", manifest.path(), e);
+            throw new CacheException("Failed to read delete manifest %s", e, 
manifest.path());
+        }
+        return ManifestCacheValue.forDeleteFiles(deleteFiles);
+    }
+
+    private static ManifestCacheValue loadWithCache(IcebergManifestCache 
cache, ManifestFile manifest,
+            Consumer<Boolean> cacheHitRecorder, Loader loader) {
+        ManifestCacheKey key = buildKey(cache, manifest);
+        Optional<ManifestCacheValue> cached = cache.peek(key);
+        boolean cacheHit = cached.isPresent();
+        if (cacheHitRecorder != null) {
+            cacheHitRecorder.accept(cacheHit);
+        }
+        if (cacheHit) {
+            return cached.get();
+        }
+        return cache.get(key, loader::load);
+    }
+
+    @FunctionalInterface
+    private interface Loader {
+        ManifestCacheValue load();
+    }
+
+    private static ManifestCacheKey buildKey(IcebergManifestCache cache, 
ManifestFile manifest) {
+        // Iceberg manifest files are immutable, so path uniquely identifies a 
manifest
+        return cache.buildKey(manifest.path());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
new file mode 100644
index 00000000000..41b52187aec
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import java.util.Objects;
+
+/**
+ * Cache key for a single Iceberg manifest file.
+ * Since Iceberg manifest files are immutable, path uniquely identifies a 
manifest.
+ */
+public class ManifestCacheKey {
+    private final String path;
+
+    public ManifestCacheKey(String path) {
+        this.path = path;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ManifestCacheKey)) {
+            return false;
+        }
+        ManifestCacheKey that = (ManifestCacheKey) o;
+        return Objects.equals(path, that.path);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(path);
+    }
+
+    @Override
+    public String toString() {
+        return "ManifestCacheKey{path='" + path + "'}";
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
new file mode 100644
index 00000000000..91e2f6db72f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Cached manifest payload containing parsed files and an estimated weight.
+ */
+public class ManifestCacheValue {
+    private final List<DataFile> dataFiles;
+    private final List<DeleteFile> deleteFiles;
+    private final long weightBytes;
+
+    private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile> 
deleteFiles, long weightBytes) {
+        this.dataFiles = dataFiles == null ? Collections.emptyList() : 
dataFiles;
+        this.deleteFiles = deleteFiles == null ? Collections.emptyList() : 
deleteFiles;
+        this.weightBytes = weightBytes;
+    }
+
+    public static ManifestCacheValue forDataFiles(List<DataFile> dataFiles) {
+        return new ManifestCacheValue(dataFiles, Collections.emptyList(),
+                estimateWeight(dataFiles, Collections.emptyList()));
+    }
+
+    public static ManifestCacheValue forDeleteFiles(List<DeleteFile> 
deleteFiles) {
+        return new ManifestCacheValue(Collections.emptyList(), deleteFiles,
+                estimateWeight(Collections.emptyList(), deleteFiles));
+    }
+
+    public List<DataFile> getDataFiles() {
+        return dataFiles;
+    }
+
+    public List<DeleteFile> getDeleteFiles() {
+        return deleteFiles;
+    }
+
+    public long getWeightBytes() {
+        return weightBytes;
+    }
+
+    private static long estimateWeight(List<DataFile> dataFiles, 
List<DeleteFile> deleteFiles) {
+        return ContentFileEstimator.estimate(dataFiles) + 
ContentFileEstimator.estimate(deleteFiles);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 1acafbde5df..29f34bcc5cc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.common.util.Util;
@@ -38,6 +39,9 @@ import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader;
+import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue;
 import org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
@@ -58,18 +62,27 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.iceberg.BaseFileScanTask;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeleteFileIndex;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.types.Conversions;
@@ -79,9 +92,12 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
@@ -115,6 +131,9 @@ public class IcebergScanNode extends FileQueryScanNode {
     // get them in doInitialize() to ensure internal consistency of ScanNode
     private Map<StorageProperties.Type, StorageProperties> 
storagePropertiesMap;
     private Map<String, String> backendStorageProperties;
+    private long manifestCacheHits;
+    private long manifestCacheMisses;
+    private long manifestCacheFailures;
 
     // for test
     @VisibleForTesting
@@ -303,6 +322,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                         }
                 );
                 splitAssignment.finishSchedule();
+                recordManifestCacheProfile();
             } catch (Exception e) {
                 Optional<NotSupportedException> opt = 
checkNotSupportedException(e);
                 if (opt.isPresent()) {
@@ -359,8 +379,136 @@ public class IcebergScanNode extends FileQueryScanNode {
     }
 
     private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+        if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+        try {
+            return planFileScanTaskWithManifestCache(scan);
+        } catch (Exception e) {
+            manifestCacheFailures++;
+            LOG.warn("Plan with manifest cache failed, fallback to original 
scan: " + e.getMessage(), e);
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+    }
+
+    private CloseableIterable<FileScanTask> 
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
+        // Get the snapshot from the scan; return empty if no snapshot exists
+        Snapshot snapshot = scan.snapshot();
+        if (snapshot == null) {
+            return CloseableIterable.withNoopClose(Collections.emptyList());
+        }
+
+        // Initialize manifest cache for efficient manifest file access
+        IcebergManifestCache cache = 
IcebergUtils.getManifestCache(source.getCatalog());
+
+        // Convert query conjuncts to Iceberg filter expression
+        // This combines all predicates with AND logic for partition/file 
pruning
+        Expression filterExpr = conjuncts.stream()
+                .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, 
icebergTable.schema()))
+                .filter(Objects::nonNull)
+                .reduce(Expressions.alwaysTrue(), Expressions::and);
+
+        // Get all partition specs by their IDs for later use
+        Map<Integer, PartitionSpec> specsById = icebergTable.specs();
+        boolean caseSensitive = true;
+
+        // Create residual evaluators for each partition spec
+        // Residual evaluators compute the remaining filter expression after 
partition pruning
+        Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>();
+        specsById.forEach((id, spec) -> residualEvaluators.put(id,
+                ResidualEvaluator.of(spec, filterExpr, caseSensitive)));
+
+        // Create metrics evaluator for file-level pruning based on column 
statistics
+        InclusiveMetricsEvaluator metricsEvaluator =
+                new InclusiveMetricsEvaluator(icebergTable.schema(), 
filterExpr, caseSensitive);
+
+        // ========== Phase 1: Load delete files from delete manifests 
==========
+        List<DeleteFile> deleteFiles = new ArrayList<>();
+        List<ManifestFile> deleteManifests = 
snapshot.deleteManifests(icebergTable.io());
+        for (ManifestFile manifest : deleteManifests) {
+            // Skip non-delete manifests
+            if (manifest.content() != ManifestContent.DELETES) {
+                continue;
+            }
+            // Get the partition spec for this manifest
+            PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+            if (spec == null) {
+                continue;
+            }
+            // Create manifest evaluator for partition-level pruning
+            ManifestEvaluator evaluator =
+                    ManifestEvaluator.forPartitionFilter(filterExpr, spec, 
caseSensitive);
+            // Skip manifest if it doesn't match the filter expression 
(partition pruning)
+            if (!evaluator.eval(manifest)) {
+                continue;
+            }
+            // Load delete files from cache (or from storage if not cached)
+            ManifestCacheValue value = 
IcebergManifestCacheLoader.loadDeleteFilesWithCache(cache, manifest,
+                    icebergTable, this::recordManifestCacheAccess);
+            deleteFiles.addAll(value.getDeleteFiles());
+        }
+
+        // Build delete file index for efficient lookup of deletes applicable 
to each data file
+        DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(deleteFiles)
+                .specsById(specsById)
+                .caseSensitive(caseSensitive)
+                .build();
+
+        // ========== Phase 2: Load data files and create scan tasks ==========
+        List<FileScanTask> tasks = new ArrayList<>();
+        try (CloseableIterable<ManifestFile> dataManifests =
+                     
IcebergUtils.getMatchingManifest(snapshot.dataManifests(icebergTable.io()),
+                             specsById, filterExpr)) {
+            for (ManifestFile manifest : dataManifests) {
+                // Skip non-data manifests
+                if (manifest.content() != ManifestContent.DATA) {
+                    continue;
+                }
+                // Get the partition spec for this manifest
+                PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+                if (spec == null) {
+                    continue;
+                }
+                // Get the residual evaluator for this partition spec
+                ResidualEvaluator residualEvaluator = 
residualEvaluators.get(manifest.partitionSpecId());
+                if (residualEvaluator == null) {
+                    continue;
+                }
+
+                // Load data files from cache (or from storage if not cached)
+                ManifestCacheValue value = 
IcebergManifestCacheLoader.loadDataFilesWithCache(cache, manifest,
+                        icebergTable, this::recordManifestCacheAccess);
+
+                // Process each data file in the manifest
+                for (org.apache.iceberg.DataFile dataFile : 
value.getDataFiles()) {
+                    // Skip file if column statistics indicate no matching 
rows (metrics-based pruning)
+                    if (metricsEvaluator != null && 
!metricsEvaluator.eval(dataFile)) {
+                        continue;
+                    }
+                    // Skip file if partition values don't match the residual 
filter
+                    if 
(residualEvaluator.residualFor(dataFile.partition()).equals(Expressions.alwaysFalse()))
 {
+                        continue;
+                    }
+                    // Find all delete files that apply to this data file 
based on sequence number
+                    List<DeleteFile> deletes = Arrays.asList(
+                            
deleteIndex.forDataFile(dataFile.dataSequenceNumber(), dataFile));
+
+                    // Create a FileScanTask containing the data file, 
associated deletes, and metadata
+                    tasks.add(new BaseFileScanTask(
+                            dataFile,
+                            deletes.toArray(new DeleteFile[0]),
+                            SchemaParser.toJson(icebergTable.schema()),
+                            PartitionSpecParser.toJson(spec),
+                            residualEvaluator));
+                }
+            }
+        }
+
+        // Split tasks into smaller chunks based on target split size for 
parallel processing
         long targetSplitSize = getRealFileSplitSize(0);
-        return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        return 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks), 
targetSplitSize);
     }
 
     private Split createIcebergSplit(FileScanTask fileScanTask) {
@@ -418,6 +566,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                 splits.add(createIcebergSplit(task));
             }
             selectedPartitionNum = partitionMapInfos.size();
+            recordManifestCacheProfile();
             return splits;
         }
 
@@ -436,6 +585,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                 }
                 setPushDownCount(countFromSnapshot);
                 assignCountToSplits(splits, countFromSnapshot);
+                recordManifestCacheProfile();
                 return splits;
             } else {
                 fileScanTasks.forEach(taskGrp -> 
splits.add(createIcebergSplit(taskGrp)));
@@ -445,6 +595,7 @@ public class IcebergScanNode extends FileQueryScanNode {
         }
 
         selectedPartitionNum = partitionMapInfos.size();
+        recordManifestCacheProfile();
         return splits;
     }
 
@@ -563,6 +714,27 @@ public class IcebergScanNode extends FileQueryScanNode {
         return new ArrayList<>();
     }
 
+    private void recordManifestCacheAccess(boolean cacheHit) {
+        if (cacheHit) {
+            manifestCacheHits++;
+        } else {
+            manifestCacheMisses++;
+        }
+    }
+
+    private void recordManifestCacheProfile() {
+        if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
+            return;
+        }
+        SummaryProfile summaryProfile = 
SummaryProfile.getSummaryProfile(ConnectContext.get());
+        if (summaryProfile == null || summaryProfile.getExecutionSummary() == 
null) {
+            return;
+        }
+        summaryProfile.getExecutionSummary().addInfoString("Manifest Cache",
+                String.format("hits=%d, misses=%d, failures=%d",
+                        manifestCacheHits, manifestCacheMisses, 
manifestCacheFailures));
+    }
+
     @Override
     public TableIf getTargetTable() {
         return source.getTargetTable();
@@ -612,15 +784,23 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
-        if (pushdownIcebergPredicates.isEmpty()) {
-            return super.getNodeExplainString(prefix, detailLevel);
+        String base = super.getNodeExplainString(prefix, detailLevel);
+        StringBuilder builder = new StringBuilder(base);
+
+        if (detailLevel == TExplainLevel.VERBOSE && 
IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
+            builder.append(prefix).append("manifest cache: 
hits=").append(manifestCacheHits)
+                    .append(", misses=").append(manifestCacheMisses)
+                    .append(", 
failures=").append(manifestCacheFailures).append("\n");
         }
-        StringBuilder sb = new StringBuilder();
-        for (String predicate : pushdownIcebergPredicates) {
-            sb.append(prefix).append(prefix).append(predicate).append("\n");
+
+        if (!pushdownIcebergPredicates.isEmpty()) {
+            StringBuilder sb = new StringBuilder();
+            for (String predicate : pushdownIcebergPredicates) {
+                
sb.append(prefix).append(prefix).append(predicate).append("\n");
+            }
+            builder.append(String.format("%sicebergPredicatePushdown=\n%s\n", 
prefix, sb));
         }
-        return super.getNodeExplainString(prefix, detailLevel)
-                + String.format("%sicebergPredicatePushdown=\n%s\n", prefix, 
sb);
+        return builder.toString();
     }
 
     private void assignCountToSplits(List<Split> splits, long totalCount) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
index 2cc829c8743..88def12d2a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
@@ -43,6 +43,43 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
     )
     protected String warehouse;
 
+    @Getter
+    @ConnectorProperty(
+            names = {CatalogProperties.IO_MANIFEST_CACHE_ENABLED},
+            required = false,
+            description = "Controls whether to use caching during manifest 
reads or not. Default: false."
+    )
+    protected String ioManifestCacheEnabled;
+
+    @Getter
+    @ConnectorProperty(
+            names = 
{CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS},
+            required = false,
+            description = "Controls the maximum duration for which an entry 
stays in the manifest cache. "
+                    + "Must be a non-negative value. Zero means entries expire 
only due to memory pressure. "
+                    + "Default: 60000 (60s)."
+    )
+    protected String ioManifestCacheExpirationIntervalMs;
+
+    @Getter
+    @ConnectorProperty(
+            names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES},
+            required = false,
+            description = "Controls the maximum total amount of bytes to cache 
in manifest cache. "
+                    + "Must be a positive value. Default: 104857600 (100MB)."
+    )
+    protected String ioManifestCacheMaxTotalBytes;
+
+    @Getter
+    @ConnectorProperty(
+            names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH},
+            required = false,
+            description = "Controls the maximum length of file to be 
considered for caching. "
+                    + "An InputFile will not be cached if the length is longer 
than this limit. "
+                    + "Must be a positive value. Default: 8388608 (8MB)."
+    )
+    protected String ioManifestCacheMaxContentLength;
+
     @Getter
     protected ExecutionAuthenticator executionAuthenticator = new 
ExecutionAuthenticator(){};
 
@@ -80,6 +117,9 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
             catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
         }
 
+        // Add manifest cache properties if configured
+        addManifestCacheProperties(catalogProps);
+
         Catalog catalog = initCatalog(catalogName, catalogProps, 
storagePropertiesList);
 
         if (catalog == null) {
@@ -88,6 +128,28 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
         return catalog;
     }
 
+    /**
+     * Add manifest cache related properties to catalog properties.
+     * These properties control caching behavior during manifest reads.
+     *
+     * @param catalogProps the catalog properties map to add manifest cache 
properties to
+     */
+    protected void addManifestCacheProperties(Map<String, String> 
catalogProps) {
+        if (StringUtils.isNotBlank(ioManifestCacheEnabled)) {
+            catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, 
ioManifestCacheEnabled);
+        }
+        if (StringUtils.isNotBlank(ioManifestCacheExpirationIntervalMs)) {
+            
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
+                    ioManifestCacheExpirationIntervalMs);
+        }
+        if (StringUtils.isNotBlank(ioManifestCacheMaxTotalBytes)) {
+            
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, 
ioManifestCacheMaxTotalBytes);
+        }
+        if (StringUtils.isNotBlank(ioManifestCacheMaxContentLength)) {
+            
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, 
ioManifestCacheMaxContentLength);
+        }
+    }
+
     /**
      * Subclasses must implement this to create the concrete Catalog instance.
      */
diff --git a/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java 
b/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
new file mode 100644
index 00000000000..36cf36b556d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -0,0 +1,906 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.iceberg.util.PartitionSet;
+import org.apache.iceberg.util.Tasks;
+
+/**
+ * An index of {@link DeleteFile delete files} by sequence number.
+ *
+ * <p>Use {@link #builderFor(FileIO, Iterable)} to construct an index, and 
{@link #forDataFile(long,
+ * DataFile)} or {@link #forEntry(ManifestEntry)} to get the delete files to 
apply to a given data
+ * file.
+ * 
+ * Copied from 
https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+ * Change DeleteFileIndex and some methods to public.
+ */
+public class DeleteFileIndex {
+  private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0];
+
+  private final EqualityDeletes globalDeletes;
+  private final PartitionMap<EqualityDeletes> eqDeletesByPartition;
+  private final PartitionMap<PositionDeletes> posDeletesByPartition;
+  private final Map<String, PositionDeletes> posDeletesByPath;
+  private final Map<String, DeleteFile> dvByPath;
+  private final boolean hasEqDeletes;
+  private final boolean hasPosDeletes;
+  private final boolean isEmpty;
+
+  private DeleteFileIndex(
+      EqualityDeletes globalDeletes,
+      PartitionMap<EqualityDeletes> eqDeletesByPartition,
+      PartitionMap<PositionDeletes> posDeletesByPartition,
+      Map<String, PositionDeletes> posDeletesByPath,
+      Map<String, DeleteFile> dvByPath) {
+    this.globalDeletes = globalDeletes;
+    this.eqDeletesByPartition = eqDeletesByPartition;
+    this.posDeletesByPartition = posDeletesByPartition;
+    this.posDeletesByPath = posDeletesByPath;
+    this.dvByPath = dvByPath;
+    this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null;
+    this.hasPosDeletes =
+        posDeletesByPartition != null || posDeletesByPath != null || dvByPath 
!= null;
+    this.isEmpty = !hasEqDeletes && !hasPosDeletes;
+  }
+
+  public boolean isEmpty() {
+    return isEmpty;
+  }
+
+  public boolean hasEqualityDeletes() {
+    return hasEqDeletes;
+  }
+
+  public boolean hasPositionDeletes() {
+    return hasPosDeletes;
+  }
+
+  public Iterable<DeleteFile> referencedDeleteFiles() {
+    Iterable<DeleteFile> deleteFiles = Collections.emptyList();
+
+    if (globalDeletes != null) {
+      deleteFiles = Iterables.concat(deleteFiles, 
globalDeletes.referencedDeleteFiles());
+    }
+
+    if (eqDeletesByPartition != null) {
+      for (EqualityDeletes deletes : eqDeletesByPartition.values()) {
+        deleteFiles = Iterables.concat(deleteFiles, 
deletes.referencedDeleteFiles());
+      }
+    }
+
+    if (posDeletesByPartition != null) {
+      for (PositionDeletes deletes : posDeletesByPartition.values()) {
+        deleteFiles = Iterables.concat(deleteFiles, 
deletes.referencedDeleteFiles());
+      }
+    }
+
+    if (posDeletesByPath != null) {
+      for (PositionDeletes deletes : posDeletesByPath.values()) {
+        deleteFiles = Iterables.concat(deleteFiles, 
deletes.referencedDeleteFiles());
+      }
+    }
+
+    if (dvByPath != null) {
+      deleteFiles = Iterables.concat(deleteFiles, dvByPath.values());
+    }
+
+    return deleteFiles;
+  }
+
+  DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
+    return forDataFile(entry.dataSequenceNumber(), entry.file());
+  }
+
+  public DeleteFile[] forDataFile(DataFile file) {
+    return forDataFile(file.dataSequenceNumber(), file);
+  }
+
+  public DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
+    if (isEmpty) {
+      return EMPTY_DELETES;
+    }
+
+    DeleteFile[] global = findGlobalDeletes(sequenceNumber, file);
+    DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file);
+    DeleteFile dv = findDV(sequenceNumber, file);
+    if (dv != null && global == null && eqPartition == null) {
+      return new DeleteFile[] {dv};
+    } else if (dv != null) {
+      return concat(global, eqPartition, new DeleteFile[] {dv});
+    } else {
+      DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, 
file);
+      DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
+      return concat(global, eqPartition, posPartition, posPath);
+    }
+  }
+
+  private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) {
+    return globalDeletes == null ? EMPTY_DELETES : globalDeletes.filter(seq, 
dataFile);
+  }
+
+  private DeleteFile[] findPosPartitionDeletes(long seq, DataFile dataFile) {
+    if (posDeletesByPartition == null) {
+      return EMPTY_DELETES;
+    }
+
+    PositionDeletes deletes = posDeletesByPartition.get(dataFile.specId(), 
dataFile.partition());
+    return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
+  }
+
+  private DeleteFile[] findEqPartitionDeletes(long seq, DataFile dataFile) {
+    if (eqDeletesByPartition == null) {
+      return EMPTY_DELETES;
+    }
+
+    EqualityDeletes deletes = eqDeletesByPartition.get(dataFile.specId(), 
dataFile.partition());
+    return deletes == null ? EMPTY_DELETES : deletes.filter(seq, dataFile);
+  }
+
+  @SuppressWarnings("CollectionUndefinedEquality")
+  private DeleteFile[] findPathDeletes(long seq, DataFile dataFile) {
+    if (posDeletesByPath == null) {
+      return EMPTY_DELETES;
+    }
+
+    PositionDeletes deletes = posDeletesByPath.get(dataFile.location());
+    return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
+  }
+
+  private DeleteFile findDV(long seq, DataFile dataFile) {
+    if (dvByPath == null) {
+      return null;
+    }
+
+    DeleteFile dv = dvByPath.get(dataFile.location());
+    if (dv != null) {
+      ValidationException.check(
+          dv.dataSequenceNumber() >= seq,
+          "DV data sequence number (%s) must be greater than or equal to data 
file sequence number (%s)",
+          dv.dataSequenceNumber(),
+          seq);
+    }
+    return dv;
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private static boolean canContainEqDeletesForFile(
+      DataFile dataFile, EqualityDeleteFile deleteFile) {
+    Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
+    Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
+
+    // whether to check data ranges or to assume that the ranges match
+    // if upper/lower bounds are missing, null counts may still be used to 
determine delete files
+    // can be skipped
+    boolean checkRanges =
+        dataLowers != null && dataUppers != null && 
deleteFile.hasLowerAndUpperBounds();
+
+    Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
+    Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
+    Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
+    Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();
+
+    for (Types.NestedField field : deleteFile.equalityFields()) {
+      if (!field.type().isPrimitiveType()) {
+        // stats are not kept for nested types. assume that the delete file 
may match
+        continue;
+      }
+
+      if (containsNull(dataNullCounts, field) && 
containsNull(deleteNullCounts, field)) {
+        // the data has null values and null has been deleted, so the deletes 
must be applied
+        continue;
+      }
+
+      if (allNull(dataNullCounts, dataValueCounts, field) && 
allNonNull(deleteNullCounts, field)) {
+        // the data file contains only null values for this field, but there 
are no deletes for null
+        // values
+        return false;
+      }
+
+      if (allNull(deleteNullCounts, deleteValueCounts, field)
+          && allNonNull(dataNullCounts, field)) {
+        // the delete file removes only null rows with null for this field, 
but there are no data
+        // rows with null
+        return false;
+      }
+
+      if (!checkRanges) {
+        // some upper and lower bounds are missing, assume they match
+        continue;
+      }
+
+      int id = field.fieldId();
+      ByteBuffer dataLower = dataLowers.get(id);
+      ByteBuffer dataUpper = dataUppers.get(id);
+      Object deleteLower = deleteFile.lowerBound(id);
+      Object deleteUpper = deleteFile.upperBound(id);
+      if (dataLower == null || dataUpper == null || deleteLower == null || 
deleteUpper == null) {
+        // at least one bound is not known, assume the delete file may match
+        continue;
+      }
+
+      if (!rangesOverlap(field, dataLower, dataUpper, deleteLower, 
deleteUpper)) {
+        // no values overlap between the data file and the deletes
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private static <T> boolean rangesOverlap(
+      Types.NestedField field,
+      ByteBuffer dataLowerBuf,
+      ByteBuffer dataUpperBuf,
+      T deleteLower,
+      T deleteUpper) {
+    Type.PrimitiveType type = field.type().asPrimitiveType();
+    Comparator<T> comparator = Comparators.forType(type);
+
+    T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf);
+    if (comparator.compare(dataLower, deleteUpper) > 0) {
+      return false;
+    }
+
+    T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf);
+    if (comparator.compare(deleteLower, dataUpper) > 0) {
+      return false;
+    }
+
+    return true;
+  }
+
+  private static boolean allNonNull(Map<Integer, Long> nullValueCounts, 
Types.NestedField field) {
+    if (field.isRequired()) {
+      return true;
+    }
+
+    if (nullValueCounts == null) {
+      return false;
+    }
+
+    Long nullValueCount = nullValueCounts.get(field.fieldId());
+    if (nullValueCount == null) {
+      return false;
+    }
+
+    return nullValueCount <= 0;
+  }
+
+  private static boolean allNull(
+      Map<Integer, Long> nullValueCounts, Map<Integer, Long> valueCounts, 
Types.NestedField field) {
+    if (field.isRequired()) {
+      return false;
+    }
+
+    if (nullValueCounts == null || valueCounts == null) {
+      return false;
+    }
+
+    Long nullValueCount = nullValueCounts.get(field.fieldId());
+    Long valueCount = valueCounts.get(field.fieldId());
+    if (nullValueCount == null || valueCount == null) {
+      return false;
+    }
+
+    return nullValueCount.equals(valueCount);
+  }
+
+  private static boolean containsNull(Map<Integer, Long> nullValueCounts, 
Types.NestedField field) {
+    if (field.isRequired()) {
+      return false;
+    }
+
+    if (nullValueCounts == null) {
+      return true;
+    }
+
+    Long nullValueCount = nullValueCounts.get(field.fieldId());
+    if (nullValueCount == null) {
+      return true;
+    }
+
+    return nullValueCount > 0;
+  }
+
+  static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) 
{
+    return new Builder(io, Sets.newHashSet(deleteManifests));
+  }
+
+  // changed to public method.
+  public static Builder builderFor(Iterable<DeleteFile> deleteFiles) {
+    return new Builder(deleteFiles);
+  }
+
+  // changed to public class.
+  public static class Builder {
+    private final FileIO io;
+    private final Set<ManifestFile> deleteManifests;
+    private final Iterable<DeleteFile> deleteFiles;
+    private long minSequenceNumber = 0L;
+    private Map<Integer, PartitionSpec> specsById = null;
+    private Expression dataFilter = Expressions.alwaysTrue();
+    private Expression partitionFilter = Expressions.alwaysTrue();
+    private PartitionSet partitionSet = null;
+    private boolean caseSensitive = true;
+    private ExecutorService executorService = null;
+    private ScanMetrics scanMetrics = ScanMetrics.noop();
+    private boolean ignoreResiduals = false;
+
+    Builder(FileIO io, Set<ManifestFile> deleteManifests) {
+      this.io = io;
+      this.deleteManifests = Sets.newHashSet(deleteManifests);
+      this.deleteFiles = null;
+    }
+
+    Builder(Iterable<DeleteFile> deleteFiles) {
+      this.io = null;
+      this.deleteManifests = null;
+      this.deleteFiles = deleteFiles;
+    }
+
+    Builder afterSequenceNumber(long seq) {
+      this.minSequenceNumber = seq;
+      return this;
+    }
+
+    public Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
+      this.specsById = newSpecsById;
+      return this;
+    }
+
+    Builder filterData(Expression newDataFilter) {
+      Preconditions.checkArgument(
+          deleteFiles == null, "Index constructed from files does not support 
data filters");
+      this.dataFilter = Expressions.and(dataFilter, newDataFilter);
+      return this;
+    }
+
+    Builder filterPartitions(Expression newPartitionFilter) {
+      Preconditions.checkArgument(
+          deleteFiles == null, "Index constructed from files does not support 
partition filters");
+      this.partitionFilter = Expressions.and(partitionFilter, 
newPartitionFilter);
+      return this;
+    }
+
+    Builder filterPartitions(PartitionSet newPartitionSet) {
+      Preconditions.checkArgument(
+          deleteFiles == null, "Index constructed from files does not support 
partition filters");
+      this.partitionSet = newPartitionSet;
+      return this;
+    }
+
+    public Builder caseSensitive(boolean newCaseSensitive) {
+      this.caseSensitive = newCaseSensitive;
+      return this;
+    }
+
+    Builder planWith(ExecutorService newExecutorService) {
+      this.executorService = newExecutorService;
+      return this;
+    }
+
+    Builder scanMetrics(ScanMetrics newScanMetrics) {
+      this.scanMetrics = newScanMetrics;
+      return this;
+    }
+
+    Builder ignoreResiduals() {
+      this.ignoreResiduals = true;
+      return this;
+    }
+
+    private Iterable<DeleteFile> filterDeleteFiles() {
+      return Iterables.filter(deleteFiles, file -> file.dataSequenceNumber() > 
minSequenceNumber);
+    }
+
+    private Collection<DeleteFile> loadDeleteFiles() {
+      // read all of the matching delete manifests in parallel and accumulate 
the matching files in
+      // a queue
+      Queue<DeleteFile> files = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(deleteManifestReaders())
+          .stopOnFailure()
+          .throwFailureWhenFinished()
+          .executeWith(executorService)
+          .run(
+              deleteFile -> {
+                try (CloseableIterable<ManifestEntry<DeleteFile>> reader = 
deleteFile) {
+                  for (ManifestEntry<DeleteFile> entry : reader) {
+                    if (entry.dataSequenceNumber() > minSequenceNumber) {
+                      // copy with stats for better filtering against data 
file stats
+                      files.add(entry.file().copy());
+                    }
+                  }
+                } catch (IOException e) {
+                  throw new RuntimeIOException(e, "Failed to close");
+                }
+              });
+      return files;
+    }
+
+    public DeleteFileIndex build() {
+      Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : 
loadDeleteFiles();
+
+      EqualityDeletes globalDeletes = new EqualityDeletes();
+      PartitionMap<EqualityDeletes> eqDeletesByPartition = 
PartitionMap.create(specsById);
+      PartitionMap<PositionDeletes> posDeletesByPartition = 
PartitionMap.create(specsById);
+      Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap();
+      Map<String, DeleteFile> dvByPath = Maps.newHashMap();
+
+      for (DeleteFile file : files) {
+        switch (file.content()) {
+          case POSITION_DELETES:
+            if (ContentFileUtil.isDV(file)) {
+              add(dvByPath, file);
+            } else {
+              add(posDeletesByPath, posDeletesByPartition, file);
+            }
+            break;
+          case EQUALITY_DELETES:
+            add(globalDeletes, eqDeletesByPartition, file);
+            break;
+          default:
+            throw new UnsupportedOperationException("Unsupported content: " + 
file.content());
+        }
+        ScanMetricsUtil.indexedDeleteFile(scanMetrics, file);
+      }
+
+      return new DeleteFileIndex(
+          globalDeletes.isEmpty() ? null : globalDeletes,
+          eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition,
+          posDeletesByPartition.isEmpty() ? null : posDeletesByPartition,
+          posDeletesByPath.isEmpty() ? null : posDeletesByPath,
+          dvByPath.isEmpty() ? null : dvByPath);
+    }
+
+    private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) {
+      String path = dv.referencedDataFile();
+      DeleteFile existingDV = dvByPath.putIfAbsent(path, dv);
+      if (existingDV != null) {
+        throw new ValidationException(
+            "Can't index multiple DVs for %s: %s and %s",
+            path, ContentFileUtil.dvDesc(dv), 
ContentFileUtil.dvDesc(existingDV));
+      }
+    }
+
+    private void add(
+        Map<String, PositionDeletes> deletesByPath,
+        PartitionMap<PositionDeletes> deletesByPartition,
+        DeleteFile file) {
+      String path = ContentFileUtil.referencedDataFileLocation(file);
+
+      PositionDeletes deletes;
+      if (path != null) {
+        deletes = deletesByPath.computeIfAbsent(path, ignored -> new 
PositionDeletes());
+      } else {
+        int specId = file.specId();
+        StructLike partition = file.partition();
+        deletes = deletesByPartition.computeIfAbsent(specId, partition, 
PositionDeletes::new);
+      }
+
+      deletes.add(file);
+    }
+
+    private void add(
+        EqualityDeletes globalDeletes,
+        PartitionMap<EqualityDeletes> deletesByPartition,
+        DeleteFile file) {
+      PartitionSpec spec = specsById.get(file.specId());
+
+      EqualityDeletes deletes;
+      if (spec.isUnpartitioned()) {
+        deletes = globalDeletes;
+      } else {
+        int specId = spec.specId();
+        StructLike partition = file.partition();
+        deletes = deletesByPartition.computeIfAbsent(specId, partition, 
EqualityDeletes::new);
+      }
+
+      deletes.add(spec, file);
+    }
+
+    private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> 
deleteManifestReaders() {
+      Expression entryFilter = ignoreResiduals ? Expressions.alwaysTrue() : 
dataFilter;
+
+      LoadingCache<Integer, Expression> partExprCache =
+          specsById == null
+              ? null
+              : Caffeine.newBuilder()
+                  .build(
+                      specId -> {
+                        PartitionSpec spec = specsById.get(specId);
+                        return Projections.inclusive(spec, 
caseSensitive).project(dataFilter);
+                      });
+
+      LoadingCache<Integer, ManifestEvaluator> evalCache =
+          specsById == null
+              ? null
+              : Caffeine.newBuilder()
+                  .build(
+                      specId -> {
+                        PartitionSpec spec = specsById.get(specId);
+                        return ManifestEvaluator.forPartitionFilter(
+                            Expressions.and(partitionFilter, 
partExprCache.get(specId)),
+                            spec,
+                            caseSensitive);
+                      });
+
+      CloseableIterable<ManifestFile> closeableDeleteManifests =
+          CloseableIterable.withNoopClose(deleteManifests);
+      CloseableIterable<ManifestFile> matchingManifests =
+          evalCache == null
+              ? closeableDeleteManifests
+              : CloseableIterable.filter(
+                  scanMetrics.skippedDeleteManifests(),
+                  closeableDeleteManifests,
+                  manifest ->
+                      manifest.content() == ManifestContent.DELETES
+                          && (manifest.hasAddedFiles() || 
manifest.hasExistingFiles())
+                          && 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+      matchingManifests =
+          CloseableIterable.count(scanMetrics.scannedDeleteManifests(), 
matchingManifests);
+      return Iterables.transform(
+          matchingManifests,
+          manifest ->
+              ManifestFiles.readDeleteManifest(manifest, io, specsById)
+                  .filterRows(entryFilter)
+                  .filterPartitions(
+                      Expressions.and(
+                          partitionFilter, 
partExprCache.get(manifest.partitionSpecId())))
+                  .filterPartitions(partitionSet)
+                  .caseSensitive(caseSensitive)
+                  .scanMetrics(scanMetrics)
+                  .liveEntries());
+    }
+  }
+
+  /**
+   * Finds an index in the sorted array of sequence numbers where the given 
sequence number should
+   * be inserted or is found.
+   *
+   * <p>If the sequence number is present in the array, this method returns 
the index of the first
+   * occurrence of the sequence number. If the sequence number is not present, 
the method returns
+   * the index where the sequence number would be inserted while maintaining 
the sorted order of the
+   * array. This returned index ranges from 0 (inclusive) to the length of the 
array (inclusive).
+   *
+   * <p>This method is used to determine the subset of delete files that apply 
to a given data file.
+   *
+   * @param seqs an array of sequence numbers sorted in ascending order
+   * @param seq the sequence number to search for
+   * @return the index of the first occurrence or the insertion point
+   */
+  private static int findStartIndex(long[] seqs, long seq) {
+    int pos = Arrays.binarySearch(seqs, seq);
+    int start;
+    if (pos < 0) {
+      // the sequence number was not found, where it would be inserted is 
-(pos + 1)
+      start = -(pos + 1);
+    } else {
+      // the sequence number was found, but may not be the first
+      // find the first delete file with the given sequence number by 
decrementing the position
+      start = pos;
+      while (start > 0 && seqs[start - 1] >= seq) {
+        start -= 1;
+      }
+    }
+
+    return start;
+  }
+
+  private static DeleteFile[] concat(DeleteFile[]... deletes) {
+    return ArrayUtil.concat(DeleteFile.class, deletes);
+  }
+
+  // a group of position delete files sorted by the sequence number they apply 
to
+  static class PositionDeletes {
+    private static final Comparator<DeleteFile> SEQ_COMPARATOR =
+        Comparator.comparingLong(DeleteFile::dataSequenceNumber);
+
+    // indexed state
+    private long[] seqs = null;
+    private DeleteFile[] files = null;
+
+    // a buffer that is used to hold files before indexing
+    private volatile List<DeleteFile> buffer = Lists.newArrayList();
+
+    public void add(DeleteFile file) {
+      Preconditions.checkState(buffer != null, "Can't add files upon 
indexing");
+      buffer.add(file);
+    }
+
+    public DeleteFile[] filter(long seq) {
+      indexIfNeeded();
+
+      int start = findStartIndex(seqs, seq);
+
+      if (start >= files.length) {
+        return EMPTY_DELETES;
+      }
+
+      if (start == 0) {
+        return files;
+      }
+
+      int matchingFilesCount = files.length - start;
+      DeleteFile[] matchingFiles = new DeleteFile[matchingFilesCount];
+      System.arraycopy(files, start, matchingFiles, 0, matchingFilesCount);
+      return matchingFiles;
+    }
+
+    public Iterable<DeleteFile> referencedDeleteFiles() {
+      indexIfNeeded();
+      return Arrays.asList(files);
+    }
+
+    public boolean isEmpty() {
+      indexIfNeeded();
+      return files.length == 0;
+    }
+
+    private void indexIfNeeded() {
+      if (buffer != null) {
+        synchronized (this) {
+          if (buffer != null) {
+            this.files = indexFiles(buffer);
+            this.seqs = indexSeqs(files);
+            this.buffer = null;
+          }
+        }
+      }
+    }
+
+    private static DeleteFile[] indexFiles(List<DeleteFile> list) {
+      DeleteFile[] array = list.toArray(EMPTY_DELETES);
+      Arrays.sort(array, SEQ_COMPARATOR);
+      return array;
+    }
+
+    private static long[] indexSeqs(DeleteFile[] files) {
+      long[] seqs = new long[files.length];
+
+      for (int index = 0; index < files.length; index++) {
+        seqs[index] = files[index].dataSequenceNumber();
+      }
+
+      return seqs;
+    }
+  }
+
+  // a group of equality delete files sorted by the sequence number they apply 
to
+  static class EqualityDeletes {
+    private static final Comparator<EqualityDeleteFile> SEQ_COMPARATOR =
+        Comparator.comparingLong(EqualityDeleteFile::applySequenceNumber);
+    private static final EqualityDeleteFile[] EMPTY_EQUALITY_DELETES = new 
EqualityDeleteFile[0];
+
+    // indexed state
+    private long[] seqs = null;
+    private EqualityDeleteFile[] files = null;
+
+    // a buffer that is used to hold files before indexing
+    private volatile List<EqualityDeleteFile> buffer = Lists.newArrayList();
+
+    public void add(PartitionSpec spec, DeleteFile file) {
+      Preconditions.checkState(buffer != null, "Can't add files upon 
indexing");
+      buffer.add(new EqualityDeleteFile(spec, file));
+    }
+
+    public DeleteFile[] filter(long seq, DataFile dataFile) {
+      indexIfNeeded();
+
+      int start = findStartIndex(seqs, seq);
+
+      if (start >= files.length) {
+        return EMPTY_DELETES;
+      }
+
+      List<DeleteFile> matchingFiles = Lists.newArrayList();
+
+      for (int index = start; index < files.length; index++) {
+        EqualityDeleteFile file = files[index];
+        if (canContainEqDeletesForFile(dataFile, file)) {
+          matchingFiles.add(file.wrapped());
+        }
+      }
+
+      return matchingFiles.toArray(EMPTY_DELETES);
+    }
+
+    public Iterable<DeleteFile> referencedDeleteFiles() {
+      indexIfNeeded();
+      return Iterables.transform(Arrays.asList(files), 
EqualityDeleteFile::wrapped);
+    }
+
+    public boolean isEmpty() {
+      indexIfNeeded();
+      return files.length == 0;
+    }
+
+    private void indexIfNeeded() {
+      if (buffer != null) {
+        synchronized (this) {
+          if (buffer != null) {
+            this.files = indexFiles(buffer);
+            this.seqs = indexSeqs(files);
+            this.buffer = null;
+          }
+        }
+      }
+    }
+
+    private static EqualityDeleteFile[] indexFiles(List<EqualityDeleteFile> 
list) {
+      EqualityDeleteFile[] array = list.toArray(EMPTY_EQUALITY_DELETES);
+      Arrays.sort(array, SEQ_COMPARATOR);
+      return array;
+    }
+
+    private static long[] indexSeqs(EqualityDeleteFile[] files) {
+      long[] seqs = new long[files.length];
+
+      for (int index = 0; index < files.length; index++) {
+        seqs[index] = files[index].applySequenceNumber();
+      }
+
+      return seqs;
+    }
+  }
+
+  // an equality delete file wrapper that caches the converted boundaries for 
faster boundary checks
+  // this class is not meant to be exposed beyond the delete file index
+  private static class EqualityDeleteFile {
+    private final PartitionSpec spec;
+    private final DeleteFile wrapped;
+    private final long applySequenceNumber;
+    private volatile List<Types.NestedField> equalityFields = null;
+    private volatile Map<Integer, Object> convertedLowerBounds = null;
+    private volatile Map<Integer, Object> convertedUpperBounds = null;
+
+    EqualityDeleteFile(PartitionSpec spec, DeleteFile file) {
+      this.spec = spec;
+      this.wrapped = file;
+      this.applySequenceNumber = wrapped.dataSequenceNumber() - 1;
+    }
+
+    public DeleteFile wrapped() {
+      return wrapped;
+    }
+
+    public long applySequenceNumber() {
+      return applySequenceNumber;
+    }
+
+    public List<Types.NestedField> equalityFields() {
+      if (equalityFields == null) {
+        synchronized (this) {
+          if (equalityFields == null) {
+            List<Types.NestedField> fields = Lists.newArrayList();
+            for (int id : wrapped.equalityFieldIds()) {
+              Types.NestedField field = spec.schema().findField(id);
+              fields.add(field);
+            }
+            this.equalityFields = fields;
+          }
+        }
+      }
+
+      return equalityFields;
+    }
+
+    public Map<Integer, Long> valueCounts() {
+      return wrapped.valueCounts();
+    }
+
+    public Map<Integer, Long> nullValueCounts() {
+      return wrapped.nullValueCounts();
+    }
+
+    public boolean hasLowerAndUpperBounds() {
+      return wrapped.lowerBounds() != null && wrapped.upperBounds() != null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T lowerBound(int id) {
+      return (T) lowerBounds().get(id);
+    }
+
+    private Map<Integer, Object> lowerBounds() {
+      if (convertedLowerBounds == null) {
+        synchronized (this) {
+          if (convertedLowerBounds == null) {
+            this.convertedLowerBounds = convertBounds(wrapped.lowerBounds());
+          }
+        }
+      }
+
+      return convertedLowerBounds;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T upperBound(int id) {
+      return (T) upperBounds().get(id);
+    }
+
+    private Map<Integer, Object> upperBounds() {
+      if (convertedUpperBounds == null) {
+        synchronized (this) {
+          if (convertedUpperBounds == null) {
+            this.convertedUpperBounds = convertBounds(wrapped.upperBounds());
+          }
+        }
+      }
+
+      return convertedUpperBounds;
+    }
+
+    private Map<Integer, Object> convertBounds(Map<Integer, ByteBuffer> 
bounds) {
+      Map<Integer, Object> converted = Maps.newHashMap();
+
+      if (bounds != null) {
+        for (Types.NestedField field : equalityFields()) {
+          int id = field.fieldId();
+          Type type = spec.schema().findField(id).type();
+          if (type.isPrimitiveType()) {
+            ByteBuffer bound = bounds.get(id);
+            if (bound != null) {
+              converted.put(id, Conversions.fromByteBuffer(type, bound));
+            }
+          }
+        }
+      }
+
+      return converted;
+    }
+  }
+}
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_manifest_cache.out
 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_manifest_cache.out
new file mode 100644
index 00000000000..4be8149ebaa
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_manifest_cache.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !desc_with_cache --
+id     int     Yes     true    \N      
+ts     datetime(6)     Yes     true    \N      
+
+-- !select_with_cache --
+1      2024-05-30T20:34:56
+2      2024-05-30T20:34:56.100
+3      2024-05-30T20:34:56.120
+4      2024-05-30T20:34:56.123
+
+-- !desc_without_cache --
+id     int     Yes     true    \N      
+ts     datetime(6)     Yes     true    \N      
+
+-- !select_without_cache --
+1      2024-05-30T20:34:56
+2      2024-05-30T20:34:56.100
+3      2024-05-30T20:34:56.120
+4      2024-05-30T20:34:56.123
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
new file mode 100644
index 00000000000..d95f05ae76c
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_manifest_cache", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String restPort = 
context.config.otherConfigs.get("iceberg_rest_uri_port")
+        String minioPort = 
context.config.otherConfigs.get("iceberg_minio_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String catalogNameWithCache = "test_iceberg_manifest_cache_enabled"
+        String catalogNameWithoutCache = "test_iceberg_manifest_cache_disabled"
+        String tableName = "tb_ts_filter"
+
+        try {
+            sql """set enable_external_table_batch_mode=false"""
+            
+            // Create catalog with manifest cache enabled
+            sql """drop catalog if exists ${catalogNameWithCache}"""
+            sql """
+                CREATE CATALOG ${catalogNameWithCache} PROPERTIES (
+                    'type'='iceberg',
+                    'iceberg.catalog.type'='rest',
+                    'uri' = 'http://${externalEnvIp}:${restPort}',
+                    "s3.access_key" = "admin",
+                    "s3.secret_key" = "password",
+                    "s3.endpoint" = "http://${externalEnvIp}:${minioPort}";,
+                    "s3.region" = "us-east-1",
+                    "iceberg.manifest.cache.enable" = "true"
+                );
+            """
+
+            // Create catalog with manifest cache disabled
+            sql """drop catalog if exists ${catalogNameWithoutCache}"""
+            sql """
+                CREATE CATALOG ${catalogNameWithoutCache} PROPERTIES (
+                    'type'='iceberg',
+                    'iceberg.catalog.type'='rest',
+                    'uri' = 'http://${externalEnvIp}:${restPort}',
+                    "s3.access_key" = "admin",
+                    "s3.secret_key" = "password",
+                    "s3.endpoint" = "http://${externalEnvIp}:${minioPort}";,
+                    "s3.region" = "us-east-1",
+                    "iceberg.manifest.cache.enable" = "false"
+                );
+            """
+
+            // Test with cache enabled
+            sql """switch ${catalogNameWithCache}"""
+            sql """use multi_catalog"""
+
+            // First explain should populate cache - check manifest cache info 
exists
+            explain {
+                sql("verbose select * from ${tableName} where id < 5")
+                contains "manifest cache:"
+                contains "hits=0"
+                contains "misses=7"
+                contains "failures=0"
+            }
+
+            // Test table structure with order_qt (should be same regardless 
of cache)
+            order_qt_desc_with_cache """desc ${tableName}"""
+
+            // Test table data with order_qt
+            order_qt_select_with_cache """select * from ${tableName} where id 
< 5"""
+
+            // Second explain should hit cache - verify cache metrics are 
present
+            explain {
+                sql("verbose select * from ${tableName} where id < 5")
+                contains "manifest cache:"
+                contains "hits=7"
+                contains "misses=0"
+                contains "failures=0"
+            }
+
+            // Test refresh catalog, the cache should be invalidated
+            sql """ refresh catalog ${catalogNameWithCache} """
+            explain {
+                sql("verbose select * from ${tableName} where id < 5")
+                contains "manifest cache:"
+                contains "hits=0"
+                contains "misses=7"
+                contains "failures=0"
+            }
+
+            // Test with cache disabled
+            sql """switch ${catalogNameWithoutCache}"""
+            sql """use multi_catalog"""
+
+            // Test table structure with order_qt (should be same as with 
cache)
+            order_qt_desc_without_cache """desc ${tableName}"""
+
+            // Test table data with order_qt
+            order_qt_select_without_cache """select * from ${tableName} where 
id < 5"""
+
+            // Explain should not contain manifest cache info when cache is 
disabled
+            explain {
+                sql("verbose select * from ${tableName} where id < 5")
+                notContains "manifest cache:"
+            }
+        } finally {
+            sql """set enable_external_table_batch_mode=true"""
+        }
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to