Copilot commented on code in PR #59056:
URL: https://github.com/apache/doris/pull/59056#discussion_r2640162804


##########
fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java:
##########
@@ -0,0 +1,906 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.iceberg.util.PartitionSet;
+import org.apache.iceberg.util.Tasks;
+
+/**
+ * An index of {@link DeleteFile delete files} by sequence number.
+ *
+ * <p>Use {@link #builderFor(FileIO, Iterable)} to construct an index, and 
{@link #forDataFile(long,
+ * DataFile)} or {@link #forEntry(ManifestEntry)} to get the delete files to 
apply to a given data
+ * file.
+ * 
+ * Copyed from 
https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java

Review Comment:
   There is a spelling error in the word "Copyed". It should be "Copied".
   ```suggestion
    * Copied from 
https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -359,8 +375,136 @@ public TableScan createTableScan() throws UserException {
     }
 
     private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+        if (!Config.iceberg_manifest_cache_enable) {
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+        try {
+            return planFileScanTaskWithManifestCache(scan);
+        } catch (Exception e) {
+            LOG.warn("Plan with manifest cache failed, fallback to original 
scan: {}", e.getMessage());
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+    }
+
+    private CloseableIterable<FileScanTask> 
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
+        // Get the snapshot from the scan; return empty if no snapshot exists
+        Snapshot snapshot = scan.snapshot();
+        if (snapshot == null) {
+            return CloseableIterable.withNoopClose(Collections.emptyList());
+        }
+
+        // Initialize manifest cache for efficient manifest file access
+        IcebergManifestCache cache = IcebergUtils.getManifestCache();
+
+        // Convert query conjuncts to Iceberg filter expression
+        // This combines all predicates with AND logic for partition/file 
pruning
+        Expression filterExpr = conjuncts.stream()
+                .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, 
icebergTable.schema()))
+                .filter(Objects::nonNull)
+                .reduce(Expressions.alwaysTrue(), Expressions::and);
+
+        // Get all partition specs by their IDs for later use
+        Map<Integer, PartitionSpec> specsById = icebergTable.specs();
+        boolean caseSensitive = true;
+
+        // Create residual evaluators for each partition spec
+        // Residual evaluators compute the remaining filter expression after 
partition pruning
+        Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>();
+        specsById.forEach((id, spec) -> residualEvaluators.put(id,
+                ResidualEvaluator.of(spec, filterExpr == null ? 
Expressions.alwaysTrue() : filterExpr,
+                        caseSensitive)));
+
+        // Create metrics evaluator for file-level pruning based on column 
statistics
+        InclusiveMetricsEvaluator metricsEvaluator = filterExpr == null ? null
+                : new InclusiveMetricsEvaluator(icebergTable.schema(), 
filterExpr, caseSensitive);
+
+        // ========== Phase 1: Load delete files from delete manifests 
==========
+        List<DeleteFile> deleteFiles = new ArrayList<>();
+        List<ManifestFile> deleteManifests = 
snapshot.deleteManifests(icebergTable.io());
+        for (ManifestFile manifest : deleteManifests) {
+            // Skip non-delete manifests
+            if (manifest.content() != ManifestContent.DELETES) {
+                continue;
+            }
+            // Get the partition spec for this manifest
+            PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+            if (spec == null) {
+                continue;
+            }
+            // Create manifest evaluator for partition-level pruning
+            ManifestEvaluator evaluator = filterExpr == null ? null
+                    : ManifestEvaluator.forPartitionFilter(filterExpr, spec, 
caseSensitive);
+            // Skip manifest if it doesn't match the filter expression 
(partition pruning)
+            if (evaluator != null && !evaluator.eval(manifest)) {
+                continue;
+            }
+            // Load delete files from cache (or from storage if not cached)
+            ManifestCacheValue value = 
IcebergManifestCacheLoader.loadDeleteFilesWithCache(cache, manifest,
+                    icebergTable);
+            deleteFiles.addAll(value.getDeleteFiles());
+        }
+
+        // Build delete file index for efficient lookup of deletes applicable 
to each data file
+        DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(deleteFiles)
+                .specsById(specsById)
+                .caseSensitive(caseSensitive)
+                .build();
+
+        // ========== Phase 2: Load data files and create scan tasks ==========
+        List<FileScanTask> tasks = new ArrayList<>();
+        try (CloseableIterable<ManifestFile> dataManifests =
+                     
IcebergUtils.getMatchingManifest(snapshot.dataManifests(icebergTable.io()),
+                             specsById, filterExpr)) {
+            for (ManifestFile manifest : dataManifests) {
+                // Skip non-data manifests
+                if (manifest.content() != ManifestContent.DATA) {
+                    continue;
+                }
+                // Get the partition spec for this manifest
+                PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+                if (spec == null) {
+                    continue;
+                }
+                // Get the residual evaluator for this partition spec
+                ResidualEvaluator residualEvaluator = 
residualEvaluators.get(manifest.partitionSpecId());

Review Comment:
   The residualEvaluator retrieved at line 471 could be null if the manifest's 
partitionSpecId is not present in the residualEvaluators map. However, line 484 
checks if it's null before using it, but line 499 also handles the null case. 
Consider checking for null immediately after retrieval (line 471) and 
continuing to the next manifest if null, to avoid processing data files with an 
invalid evaluator.
   ```suggestion
                   ResidualEvaluator residualEvaluator = 
residualEvaluators.get(manifest.partitionSpecId());
                   if (residualEvaluator == null) {
                       // Fallback to an unpartitioned, always-true evaluator 
to match existing behavior
                       residualEvaluator = 
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
                   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -359,8 +375,136 @@ public TableScan createTableScan() throws UserException {
     }
 
     private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+        if (!Config.iceberg_manifest_cache_enable) {
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+        try {
+            return planFileScanTaskWithManifestCache(scan);
+        } catch (Exception e) {
+            LOG.warn("Plan with manifest cache failed, fallback to original 
scan: {}", e.getMessage());
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+    }
+
+    private CloseableIterable<FileScanTask> 
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
+        // Get the snapshot from the scan; return empty if no snapshot exists
+        Snapshot snapshot = scan.snapshot();
+        if (snapshot == null) {
+            return CloseableIterable.withNoopClose(Collections.emptyList());
+        }
+
+        // Initialize manifest cache for efficient manifest file access
+        IcebergManifestCache cache = IcebergUtils.getManifestCache();
+
+        // Convert query conjuncts to Iceberg filter expression
+        // This combines all predicates with AND logic for partition/file 
pruning
+        Expression filterExpr = conjuncts.stream()
+                .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, 
icebergTable.schema()))
+                .filter(Objects::nonNull)
+                .reduce(Expressions.alwaysTrue(), Expressions::and);
+
+        // Get all partition specs by their IDs for later use
+        Map<Integer, PartitionSpec> specsById = icebergTable.specs();
+        boolean caseSensitive = true;
+
+        // Create residual evaluators for each partition spec
+        // Residual evaluators compute the remaining filter expression after 
partition pruning
+        Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>();
+        specsById.forEach((id, spec) -> residualEvaluators.put(id,
+                ResidualEvaluator.of(spec, filterExpr == null ? 
Expressions.alwaysTrue() : filterExpr,
+                        caseSensitive)));

Review Comment:
   The filterExpr is checked for null after it's already been assigned. Since 
the reduce operation uses Expressions.alwaysTrue() as the identity value, 
filterExpr can never be null at line 416. The null check is redundant and 
should either be removed or the logic should be restructured to handle the case 
where no valid predicates exist differently.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -359,8 +375,136 @@ public TableScan createTableScan() throws UserException {
     }
 
     private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+        if (!Config.iceberg_manifest_cache_enable) {
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+        try {
+            return planFileScanTaskWithManifestCache(scan);
+        } catch (Exception e) {
+            LOG.warn("Plan with manifest cache failed, fallback to original 
scan: {}", e.getMessage());
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+    }
+
+    private CloseableIterable<FileScanTask> 
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
+        // Get the snapshot from the scan; return empty if no snapshot exists
+        Snapshot snapshot = scan.snapshot();
+        if (snapshot == null) {
+            return CloseableIterable.withNoopClose(Collections.emptyList());
+        }
+
+        // Initialize manifest cache for efficient manifest file access
+        IcebergManifestCache cache = IcebergUtils.getManifestCache();
+
+        // Convert query conjuncts to Iceberg filter expression
+        // This combines all predicates with AND logic for partition/file 
pruning
+        Expression filterExpr = conjuncts.stream()
+                .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, 
icebergTable.schema()))
+                .filter(Objects::nonNull)
+                .reduce(Expressions.alwaysTrue(), Expressions::and);
+
+        // Get all partition specs by their IDs for later use
+        Map<Integer, PartitionSpec> specsById = icebergTable.specs();
+        boolean caseSensitive = true;
+
+        // Create residual evaluators for each partition spec
+        // Residual evaluators compute the remaining filter expression after 
partition pruning
+        Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>();
+        specsById.forEach((id, spec) -> residualEvaluators.put(id,
+                ResidualEvaluator.of(spec, filterExpr == null ? 
Expressions.alwaysTrue() : filterExpr,
+                        caseSensitive)));
+
+        // Create metrics evaluator for file-level pruning based on column 
statistics
+        InclusiveMetricsEvaluator metricsEvaluator = filterExpr == null ? null
+                : new InclusiveMetricsEvaluator(icebergTable.schema(), 
filterExpr, caseSensitive);
+
+        // ========== Phase 1: Load delete files from delete manifests 
==========
+        List<DeleteFile> deleteFiles = new ArrayList<>();
+        List<ManifestFile> deleteManifests = 
snapshot.deleteManifests(icebergTable.io());
+        for (ManifestFile manifest : deleteManifests) {
+            // Skip non-delete manifests
+            if (manifest.content() != ManifestContent.DELETES) {
+                continue;
+            }
+            // Get the partition spec for this manifest
+            PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+            if (spec == null) {
+                continue;
+            }
+            // Create manifest evaluator for partition-level pruning
+            ManifestEvaluator evaluator = filterExpr == null ? null
+                    : ManifestEvaluator.forPartitionFilter(filterExpr, spec, 
caseSensitive);

Review Comment:
   The null check for filterExpr at line 437 is also redundant since filterExpr 
cannot be null (initialized with Expressions.alwaysTrue() in the reduce 
operation at line 406). Consider removing these redundant null checks 
throughout the method.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimater.java:
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.StructLike;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
+ */
+public final class ContentFileEstimater {

Review Comment:
   The class name "ContentFileEstimater" contains a spelling error. The correct 
spelling should be "ContentFileEstimator" (with 'or' at the end, not 'er'). 
This follows the standard naming convention for classes that perform estimation.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -359,8 +375,136 @@ public TableScan createTableScan() throws UserException {
     }
 
     private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+        if (!Config.iceberg_manifest_cache_enable) {
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+        try {
+            return planFileScanTaskWithManifestCache(scan);
+        } catch (Exception e) {
+            LOG.warn("Plan with manifest cache failed, fallback to original 
scan: {}", e.getMessage());
+            long targetSplitSize = getRealFileSplitSize(0);
+            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+        }
+    }
+
+    private CloseableIterable<FileScanTask> 
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
+        // Get the snapshot from the scan; return empty if no snapshot exists
+        Snapshot snapshot = scan.snapshot();
+        if (snapshot == null) {
+            return CloseableIterable.withNoopClose(Collections.emptyList());
+        }
+
+        // Initialize manifest cache for efficient manifest file access
+        IcebergManifestCache cache = IcebergUtils.getManifestCache();
+
+        // Convert query conjuncts to Iceberg filter expression
+        // This combines all predicates with AND logic for partition/file 
pruning
+        Expression filterExpr = conjuncts.stream()
+                .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, 
icebergTable.schema()))
+                .filter(Objects::nonNull)
+                .reduce(Expressions.alwaysTrue(), Expressions::and);
+
+        // Get all partition specs by their IDs for later use
+        Map<Integer, PartitionSpec> specsById = icebergTable.specs();
+        boolean caseSensitive = true;
+
+        // Create residual evaluators for each partition spec
+        // Residual evaluators compute the remaining filter expression after 
partition pruning
+        Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>();
+        specsById.forEach((id, spec) -> residualEvaluators.put(id,
+                ResidualEvaluator.of(spec, filterExpr == null ? 
Expressions.alwaysTrue() : filterExpr,
+                        caseSensitive)));
+
+        // Create metrics evaluator for file-level pruning based on column 
statistics
+        InclusiveMetricsEvaluator metricsEvaluator = filterExpr == null ? null
+                : new InclusiveMetricsEvaluator(icebergTable.schema(), 
filterExpr, caseSensitive);

Review Comment:
   Similar to line 416, filterExpr cannot be null at this point (line 420) 
because it was initialized with Expressions.alwaysTrue() as the identity value 
in the reduce operation. The null check is redundant.



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java:
##########
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Cached manifest payload containing parsed files and an estimated weight.
+ */
+public class ManifestCacheValue {
+    private final List<DataFile> dataFiles;
+    private final List<DeleteFile> deleteFiles;
+    private final long weightBytes;
+
+    private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile> 
deleteFiles, long weightBytes) {
+        this.dataFiles = dataFiles == null ? Collections.emptyList() : 
dataFiles;
+        this.deleteFiles = deleteFiles == null ? Collections.emptyList() : 
deleteFiles;
+        this.weightBytes = weightBytes;
+    }
+
+    public static ManifestCacheValue forDataFiles(List<DataFile> dataFiles) {
+        return new ManifestCacheValue(dataFiles, Collections.emptyList(),
+                estimateWeight(dataFiles, Collections.emptyList()));
+    }
+
+    public static ManifestCacheValue forDeleteFiles(List<DeleteFile> 
deleteFiles) {
+        return new ManifestCacheValue(Collections.emptyList(), deleteFiles,
+                estimateWeight(Collections.emptyList(), deleteFiles));
+    }
+
+    public List<DataFile> getDataFiles() {
+        return dataFiles;
+    }
+
+    public List<DeleteFile> getDeleteFiles() {
+        return deleteFiles;
+    }
+
+    public long getWeightBytes() {
+        return weightBytes;
+    }
+
+    private static long estimateWeight(List<DataFile> dataFiles, 
List<DeleteFile> deleteFiles) {
+        return ContentFileEstimater.estimate(dataFiles) + 
ContentFileEstimater.estimate(deleteFiles);

Review Comment:
   The reference to "ContentFileEstimater" should be updated to match the 
corrected class name. If the class is renamed to "ContentFileEstimator", this 
reference must be updated accordingly.
   ```suggestion
           return ContentFileEstimator.estimate(dataFiles) + 
ContentFileEstimator.estimate(deleteFiles);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java:
##########
@@ -43,6 +43,43 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
     )
     protected String warehouse;
 
+    @Getter
+    @ConnectorProperty(
+            names = {CatalogProperties.IO_MANIFEST_CACHE_ENABLED},
+            required = false,
+            description = "Controls whether to use caching during manifest 
reads or not. Default: false."

Review Comment:
   The default value in the description states "Default: false" but the actual 
default value assigned is true. This inconsistency could confuse users about 
the default behavior of the cache.
   ```suggestion
               description = "Controls whether to use caching during manifest 
reads or not. Default: true."
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java:
##########
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.datasource.CacheException;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+/**
+ * A lightweight manifest cache that stores parsed DataFile/DeleteFile lists 
per manifest.
+ */
+public class IcebergManifestCache {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergManifestCache.class);
+
+    private final LoadingCache<ManifestCacheKey, ManifestCacheValue> cache;
+
+    public IcebergManifestCache() {
+        long capacityInBytes = Config.iceberg_manifest_cache_capacity_mb * 
1024L * 1024L;
+        Weigher<ManifestCacheKey, ManifestCacheValue> weigher = (key, value) 
-> {
+            long weight = 
Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L);
+            if (weight > Integer.MAX_VALUE) {
+                return Integer.MAX_VALUE;
+            }
+            return (int) weight;
+        };
+        Caffeine<ManifestCacheKey, ManifestCacheValue> builder = 
Caffeine.newBuilder()
+                .maximumWeight(capacityInBytes)
+                .weigher(weigher);
+        if (Config.iceberg_manifest_cache_ttl_sec > 0) {
+            builder = 
builder.expireAfterAccess(Duration.ofSeconds(Config.iceberg_manifest_cache_ttl_sec));
+        }
+        cache = builder.build(new CacheLoader<ManifestCacheKey, 
ManifestCacheValue>() {
+            @Override
+            public ManifestCacheValue load(ManifestCacheKey key) {
+                throw new CacheException("Manifest cache loader should be 
provided explicitly for key %s", null, key);
+            }
+        });
+    }
+
+    public ManifestCacheValue get(ManifestCacheKey key, 
Callable<ManifestCacheValue> loader) {
+        try {
+            return cache.get(key, ignored -> {
+                try {
+                    return loader.call();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        } catch (Exception e) {
+            throw new CacheException("Failed to load manifest cache for key 
%s", e, key);
+        }
+    }
+
+    public Optional<ManifestCacheValue> peek(ManifestCacheKey key) {
+        return Optional.ofNullable(cache.getIfPresent(key));
+    }
+
+    public void invalidateByPath(String path) {
+        cache.asMap().keySet().stream()
+                .filter(key -> key.getPath().equals(path))
+                .forEach(cache::invalidate);

Review Comment:
   The invalidateByPath method iterates over all keys in the cache and filters 
them, which could be inefficient for large caches. Since ManifestCacheKey only 
contains a path field, consider using cache.invalidate(new 
ManifestCacheKey(path)) directly instead of streaming over all keys. This would 
be more efficient for cache invalidation operations.
   ```suggestion
           cache.invalidate(buildKey(path));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to