Copilot commented on code in PR #59056: URL: https://github.com/apache/doris/pull/59056#discussion_r2640162804
########## fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java: ########## @@ -0,0 +1,906 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.metrics.ScanMetricsUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.PartitionMap; +import org.apache.iceberg.util.PartitionSet; +import org.apache.iceberg.util.Tasks; + +/** + * An index of {@link DeleteFile delete files} by sequence number. + * + * <p>Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, + * DataFile)} or {@link #forEntry(ManifestEntry)} to get the delete files to apply to a given data + * file. + * + * Copyed from https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java Review Comment: There is a spelling error in the word "Copyed". It should be "Copied". ```suggestion * Copied from https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java: ########## @@ -359,8 +375,136 @@ public TableScan createTableScan() throws UserException { } private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { + if (!Config.iceberg_manifest_cache_enable) { + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + try { + return planFileScanTaskWithManifestCache(scan); + } catch (Exception e) { + LOG.warn("Plan with manifest cache failed, fallback to original scan: {}", e.getMessage()); + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + } + + private CloseableIterable<FileScanTask> planFileScanTaskWithManifestCache(TableScan scan) throws IOException { + // Get the snapshot from the scan; return empty if no snapshot exists + Snapshot snapshot = scan.snapshot(); + if (snapshot == null) { + return CloseableIterable.withNoopClose(Collections.emptyList()); + } + + // Initialize manifest cache for efficient manifest file access + IcebergManifestCache cache = IcebergUtils.getManifestCache(); + + // Convert query conjuncts to Iceberg filter expression + // This combines all predicates with AND logic for partition/file pruning + Expression filterExpr = conjuncts.stream() + .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, icebergTable.schema())) + .filter(Objects::nonNull) + .reduce(Expressions.alwaysTrue(), Expressions::and); + + // Get all partition specs by their IDs for later use + Map<Integer, PartitionSpec> specsById = icebergTable.specs(); + boolean caseSensitive = true; + + // Create residual evaluators for each partition spec + // Residual evaluators compute the remaining filter expression after partition pruning + Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>(); + specsById.forEach((id, spec) -> residualEvaluators.put(id, + ResidualEvaluator.of(spec, filterExpr == null ? Expressions.alwaysTrue() : filterExpr, + caseSensitive))); + + // Create metrics evaluator for file-level pruning based on column statistics + InclusiveMetricsEvaluator metricsEvaluator = filterExpr == null ? null + : new InclusiveMetricsEvaluator(icebergTable.schema(), filterExpr, caseSensitive); + + // ========== Phase 1: Load delete files from delete manifests ========== + List<DeleteFile> deleteFiles = new ArrayList<>(); + List<ManifestFile> deleteManifests = snapshot.deleteManifests(icebergTable.io()); + for (ManifestFile manifest : deleteManifests) { + // Skip non-delete manifests + if (manifest.content() != ManifestContent.DELETES) { + continue; + } + // Get the partition spec for this manifest + PartitionSpec spec = specsById.get(manifest.partitionSpecId()); + if (spec == null) { + continue; + } + // Create manifest evaluator for partition-level pruning + ManifestEvaluator evaluator = filterExpr == null ? null + : ManifestEvaluator.forPartitionFilter(filterExpr, spec, caseSensitive); + // Skip manifest if it doesn't match the filter expression (partition pruning) + if (evaluator != null && !evaluator.eval(manifest)) { + continue; + } + // Load delete files from cache (or from storage if not cached) + ManifestCacheValue value = IcebergManifestCacheLoader.loadDeleteFilesWithCache(cache, manifest, + icebergTable); + deleteFiles.addAll(value.getDeleteFiles()); + } + + // Build delete file index for efficient lookup of deletes applicable to each data file + DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(deleteFiles) + .specsById(specsById) + .caseSensitive(caseSensitive) + .build(); + + // ========== Phase 2: Load data files and create scan tasks ========== + List<FileScanTask> tasks = new ArrayList<>(); + try (CloseableIterable<ManifestFile> dataManifests = + IcebergUtils.getMatchingManifest(snapshot.dataManifests(icebergTable.io()), + specsById, filterExpr)) { + for (ManifestFile manifest : dataManifests) { + // Skip non-data manifests + if (manifest.content() != ManifestContent.DATA) { + continue; + } + // Get the partition spec for this manifest + PartitionSpec spec = specsById.get(manifest.partitionSpecId()); + if (spec == null) { + continue; + } + // Get the residual evaluator for this partition spec + ResidualEvaluator residualEvaluator = residualEvaluators.get(manifest.partitionSpecId()); Review Comment: The residualEvaluator retrieved at line 471 could be null if the manifest's partitionSpecId is not present in the residualEvaluators map. However, line 484 checks if it's null before using it, but line 499 also handles the null case. Consider checking for null immediately after retrieval (line 471) and continuing to the next manifest if null, to avoid processing data files with an invalid evaluator. ```suggestion ResidualEvaluator residualEvaluator = residualEvaluators.get(manifest.partitionSpecId()); if (residualEvaluator == null) { // Fallback to an unpartitioned, always-true evaluator to match existing behavior residualEvaluator = ResidualEvaluator.unpartitioned(Expressions.alwaysTrue()); } ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java: ########## @@ -359,8 +375,136 @@ public TableScan createTableScan() throws UserException { } private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { + if (!Config.iceberg_manifest_cache_enable) { + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + try { + return planFileScanTaskWithManifestCache(scan); + } catch (Exception e) { + LOG.warn("Plan with manifest cache failed, fallback to original scan: {}", e.getMessage()); + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + } + + private CloseableIterable<FileScanTask> planFileScanTaskWithManifestCache(TableScan scan) throws IOException { + // Get the snapshot from the scan; return empty if no snapshot exists + Snapshot snapshot = scan.snapshot(); + if (snapshot == null) { + return CloseableIterable.withNoopClose(Collections.emptyList()); + } + + // Initialize manifest cache for efficient manifest file access + IcebergManifestCache cache = IcebergUtils.getManifestCache(); + + // Convert query conjuncts to Iceberg filter expression + // This combines all predicates with AND logic for partition/file pruning + Expression filterExpr = conjuncts.stream() + .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, icebergTable.schema())) + .filter(Objects::nonNull) + .reduce(Expressions.alwaysTrue(), Expressions::and); + + // Get all partition specs by their IDs for later use + Map<Integer, PartitionSpec> specsById = icebergTable.specs(); + boolean caseSensitive = true; + + // Create residual evaluators for each partition spec + // Residual evaluators compute the remaining filter expression after partition pruning + Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>(); + specsById.forEach((id, spec) -> residualEvaluators.put(id, + ResidualEvaluator.of(spec, filterExpr == null ? Expressions.alwaysTrue() : filterExpr, + caseSensitive))); Review Comment: The filterExpr is checked for null after it's already been assigned. Since the reduce operation uses Expressions.alwaysTrue() as the identity value, filterExpr can never be null at line 416. The null check is redundant and should either be removed or the logic should be restructured to handle the case where no valid predicates exist differently. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java: ########## @@ -359,8 +375,136 @@ public TableScan createTableScan() throws UserException { } private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { + if (!Config.iceberg_manifest_cache_enable) { + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + try { + return planFileScanTaskWithManifestCache(scan); + } catch (Exception e) { + LOG.warn("Plan with manifest cache failed, fallback to original scan: {}", e.getMessage()); + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + } + + private CloseableIterable<FileScanTask> planFileScanTaskWithManifestCache(TableScan scan) throws IOException { + // Get the snapshot from the scan; return empty if no snapshot exists + Snapshot snapshot = scan.snapshot(); + if (snapshot == null) { + return CloseableIterable.withNoopClose(Collections.emptyList()); + } + + // Initialize manifest cache for efficient manifest file access + IcebergManifestCache cache = IcebergUtils.getManifestCache(); + + // Convert query conjuncts to Iceberg filter expression + // This combines all predicates with AND logic for partition/file pruning + Expression filterExpr = conjuncts.stream() + .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, icebergTable.schema())) + .filter(Objects::nonNull) + .reduce(Expressions.alwaysTrue(), Expressions::and); + + // Get all partition specs by their IDs for later use + Map<Integer, PartitionSpec> specsById = icebergTable.specs(); + boolean caseSensitive = true; + + // Create residual evaluators for each partition spec + // Residual evaluators compute the remaining filter expression after partition pruning + Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>(); + specsById.forEach((id, spec) -> residualEvaluators.put(id, + ResidualEvaluator.of(spec, filterExpr == null ? Expressions.alwaysTrue() : filterExpr, + caseSensitive))); + + // Create metrics evaluator for file-level pruning based on column statistics + InclusiveMetricsEvaluator metricsEvaluator = filterExpr == null ? null + : new InclusiveMetricsEvaluator(icebergTable.schema(), filterExpr, caseSensitive); + + // ========== Phase 1: Load delete files from delete manifests ========== + List<DeleteFile> deleteFiles = new ArrayList<>(); + List<ManifestFile> deleteManifests = snapshot.deleteManifests(icebergTable.io()); + for (ManifestFile manifest : deleteManifests) { + // Skip non-delete manifests + if (manifest.content() != ManifestContent.DELETES) { + continue; + } + // Get the partition spec for this manifest + PartitionSpec spec = specsById.get(manifest.partitionSpecId()); + if (spec == null) { + continue; + } + // Create manifest evaluator for partition-level pruning + ManifestEvaluator evaluator = filterExpr == null ? null + : ManifestEvaluator.forPartitionFilter(filterExpr, spec, caseSensitive); Review Comment: The null check for filterExpr at line 437 is also redundant since filterExpr cannot be null (initialized with Expressions.alwaysTrue() in the reduce operation at line 406). Consider removing these redundant null checks throughout the method. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimater.java: ########## @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.cache; + +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.StructLike; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +/** + * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects. + */ +public final class ContentFileEstimater { Review Comment: The class name "ContentFileEstimater" contains a spelling error. The correct spelling should be "ContentFileEstimator" (with 'or' at the end, not 'er'). This follows the standard naming convention for classes that perform estimation. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java: ########## @@ -359,8 +375,136 @@ public TableScan createTableScan() throws UserException { } private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { + if (!Config.iceberg_manifest_cache_enable) { + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + try { + return planFileScanTaskWithManifestCache(scan); + } catch (Exception e) { + LOG.warn("Plan with manifest cache failed, fallback to original scan: {}", e.getMessage()); + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + } + + private CloseableIterable<FileScanTask> planFileScanTaskWithManifestCache(TableScan scan) throws IOException { + // Get the snapshot from the scan; return empty if no snapshot exists + Snapshot snapshot = scan.snapshot(); + if (snapshot == null) { + return CloseableIterable.withNoopClose(Collections.emptyList()); + } + + // Initialize manifest cache for efficient manifest file access + IcebergManifestCache cache = IcebergUtils.getManifestCache(); + + // Convert query conjuncts to Iceberg filter expression + // This combines all predicates with AND logic for partition/file pruning + Expression filterExpr = conjuncts.stream() + .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, icebergTable.schema())) + .filter(Objects::nonNull) + .reduce(Expressions.alwaysTrue(), Expressions::and); + + // Get all partition specs by their IDs for later use + Map<Integer, PartitionSpec> specsById = icebergTable.specs(); + boolean caseSensitive = true; + + // Create residual evaluators for each partition spec + // Residual evaluators compute the remaining filter expression after partition pruning + Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>(); + specsById.forEach((id, spec) -> residualEvaluators.put(id, + ResidualEvaluator.of(spec, filterExpr == null ? Expressions.alwaysTrue() : filterExpr, + caseSensitive))); + + // Create metrics evaluator for file-level pruning based on column statistics + InclusiveMetricsEvaluator metricsEvaluator = filterExpr == null ? null + : new InclusiveMetricsEvaluator(icebergTable.schema(), filterExpr, caseSensitive); Review Comment: Similar to line 416, filterExpr cannot be null at this point (line 420) because it was initialized with Expressions.alwaysTrue() as the identity value in the reduce operation. The null check is redundant. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java: ########## @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.cache; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; + +import java.util.Collections; +import java.util.List; + +/** + * Cached manifest payload containing parsed files and an estimated weight. + */ +public class ManifestCacheValue { + private final List<DataFile> dataFiles; + private final List<DeleteFile> deleteFiles; + private final long weightBytes; + + private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile> deleteFiles, long weightBytes) { + this.dataFiles = dataFiles == null ? Collections.emptyList() : dataFiles; + this.deleteFiles = deleteFiles == null ? Collections.emptyList() : deleteFiles; + this.weightBytes = weightBytes; + } + + public static ManifestCacheValue forDataFiles(List<DataFile> dataFiles) { + return new ManifestCacheValue(dataFiles, Collections.emptyList(), + estimateWeight(dataFiles, Collections.emptyList())); + } + + public static ManifestCacheValue forDeleteFiles(List<DeleteFile> deleteFiles) { + return new ManifestCacheValue(Collections.emptyList(), deleteFiles, + estimateWeight(Collections.emptyList(), deleteFiles)); + } + + public List<DataFile> getDataFiles() { + return dataFiles; + } + + public List<DeleteFile> getDeleteFiles() { + return deleteFiles; + } + + public long getWeightBytes() { + return weightBytes; + } + + private static long estimateWeight(List<DataFile> dataFiles, List<DeleteFile> deleteFiles) { + return ContentFileEstimater.estimate(dataFiles) + ContentFileEstimater.estimate(deleteFiles); Review Comment: The reference to "ContentFileEstimater" should be updated to match the corrected class name. If the class is renamed to "ContentFileEstimator", this reference must be updated accordingly. ```suggestion return ContentFileEstimator.estimate(dataFiles) + ContentFileEstimator.estimate(deleteFiles); ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java: ########## @@ -43,6 +43,43 @@ public abstract class AbstractIcebergProperties extends MetastoreProperties { ) protected String warehouse; + @Getter + @ConnectorProperty( + names = {CatalogProperties.IO_MANIFEST_CACHE_ENABLED}, + required = false, + description = "Controls whether to use caching during manifest reads or not. Default: false." Review Comment: The default value in the description states "Default: false" but the actual default value assigned is true. This inconsistency could confuse users about the default behavior of the cache. ```suggestion description = "Controls whether to use caching during manifest reads or not. Default: true." ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java: ########## @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.cache; + +import org.apache.doris.common.Config; +import org.apache.doris.datasource.CacheException; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; + +/** + * A lightweight manifest cache that stores parsed DataFile/DeleteFile lists per manifest. + */ +public class IcebergManifestCache { + private static final Logger LOG = LogManager.getLogger(IcebergManifestCache.class); + + private final LoadingCache<ManifestCacheKey, ManifestCacheValue> cache; + + public IcebergManifestCache() { + long capacityInBytes = Config.iceberg_manifest_cache_capacity_mb * 1024L * 1024L; + Weigher<ManifestCacheKey, ManifestCacheValue> weigher = (key, value) -> { + long weight = Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L); + if (weight > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int) weight; + }; + Caffeine<ManifestCacheKey, ManifestCacheValue> builder = Caffeine.newBuilder() + .maximumWeight(capacityInBytes) + .weigher(weigher); + if (Config.iceberg_manifest_cache_ttl_sec > 0) { + builder = builder.expireAfterAccess(Duration.ofSeconds(Config.iceberg_manifest_cache_ttl_sec)); + } + cache = builder.build(new CacheLoader<ManifestCacheKey, ManifestCacheValue>() { + @Override + public ManifestCacheValue load(ManifestCacheKey key) { + throw new CacheException("Manifest cache loader should be provided explicitly for key %s", null, key); + } + }); + } + + public ManifestCacheValue get(ManifestCacheKey key, Callable<ManifestCacheValue> loader) { + try { + return cache.get(key, ignored -> { + try { + return loader.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + throw new CacheException("Failed to load manifest cache for key %s", e, key); + } + } + + public Optional<ManifestCacheValue> peek(ManifestCacheKey key) { + return Optional.ofNullable(cache.getIfPresent(key)); + } + + public void invalidateByPath(String path) { + cache.asMap().keySet().stream() + .filter(key -> key.getPath().equals(path)) + .forEach(cache::invalidate); Review Comment: The invalidateByPath method iterates over all keys in the cache and filters them, which could be inefficient for large caches. Since ManifestCacheKey only contains a path field, consider using cache.invalidate(new ManifestCacheKey(path)) directly instead of streaming over all keys. This would be more efficient for cache invalidation operations. ```suggestion cache.invalidate(buildKey(path)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
