This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e3e0590ab8c [Feature](iceberg) Add manifest-level cache for Iceberg
tables to reduce I/O and parsing overhead (#59056)
e3e0590ab8c is described below
commit e3e0590ab8c2942cfee62c6686859d2e2f77c842
Author: Socrates <[email protected]>
AuthorDate: Fri Dec 26 17:48:36 2025 +0800
[Feature](iceberg) Add manifest-level cache for Iceberg tables to reduce
I/O and parsing overhead (#59056)
### What problem does this PR solve?
## Motivation
During Iceberg query planning, FE needs to read and parse the metadata
chain: ManifestList → Manifest → DataFile/DeleteFile. When frequently
querying hot partitions or executing small batch queries, the same
Manifest files are repeatedly read and parsed, causing significant I/O
and CPU overhead.
## Solution
This PR introduces a manifest-level cache (`IcebergManifestCache`) in FE
to cache the parsed DataFile/DeleteFile lists per manifest file. The
cache is implemented using Caffeine with weight-based LRU eviction and
TTL support.
### Key Components
- **IcebergManifestCache**: Core cache implementation using Caffeine
- Weight-based LRU eviction controlled by
`iceberg.manifest.cache.capacity-mb`
- TTL expiration via `iceberg.manifest.cache.ttl-second`
- Single-flight loading to prevent duplicate parsing of the same
manifest
- **ManifestCacheKey**: Cache key consisting of:
- Manifest file path
- **ManifestCacheValue**: Cached payload containing:
- List of `DataFile` or `DeleteFile`
- Estimated memory weight for eviction
- **IcebergManifestCacheLoader**: Helper class to load and populate the
cache using `ManifestFiles.read()`
### Cache Invalidation Strategy
- Key changes automatically invalidate stale entries
(length/lastModified/sequenceNumber changes)
- TTL prevents stale data when underlying storage doesn't support
precise mtime/etag
- Different snapshots use different manifest paths/keys, ensuring
snapshot-level isolation
### Iceberg Catalog Properties
| Config | Default | Description |
|--------|---------|-------------|
| `iceberg.manifest.cache.enable` | `true` | Enable/disable manifest
cache |
| `iceberg.manifest.cache.capacity-mb` | `1024` | Maximum cache capacity
in MB |
| `iceberg.manifest.cache.ttl-second` | `48 * 60 * 60` | Cache entry
expiration after access |
### Integration Point
The cache is integrated in
`IcebergScanNode.planFileScanTaskWithManifestCache()`, which:
1. Loads delete manifests via cache and builds `DeleteFileIndex`
2. Loads data manifests via cache and creates `FileScanTask` for each
data file
3. Falls back to original scan if cache loading fails
---
fe/check/checkstyle/suppressions.xml | 3 +
.../datasource/iceberg/IcebergExternalCatalog.java | 36 +
.../datasource/iceberg/IcebergMetadataCache.java | 16 +
.../doris/datasource/iceberg/IcebergUtils.java | 16 +
.../iceberg/cache/ContentFileEstimator.java | 194 +++++
.../iceberg/cache/IcebergManifestCache.java | 91 +++
.../iceberg/cache/IcebergManifestCacheLoader.java | 118 +++
.../datasource/iceberg/cache/ManifestCacheKey.java | 58 ++
.../iceberg/cache/ManifestCacheValue.java | 65 ++
.../datasource/iceberg/source/IcebergScanNode.java | 196 ++++-
.../metastore/AbstractIcebergProperties.java | 62 ++
.../java/org/apache/iceberg/DeleteFileIndex.java | 906 +++++++++++++++++++++
.../iceberg/test_iceberg_manifest_cache.out | 21 +
.../iceberg/test_iceberg_manifest_cache.groovy | 119 +++
14 files changed, 1893 insertions(+), 8 deletions(-)
diff --git a/fe/check/checkstyle/suppressions.xml
b/fe/check/checkstyle/suppressions.xml
index 8f000bb7616..7340c4c5bd5 100644
--- a/fe/check/checkstyle/suppressions.xml
+++ b/fe/check/checkstyle/suppressions.xml
@@ -69,6 +69,9 @@ under the License.
<!-- ignore hudi disk map copied from
hudi/common/util/collection/DiskMap.java -->
<suppress
files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java"
checks="[a-zA-Z0-9]*"/>
+ <!-- ignore iceberg delete file index copied from
iceberg/DeleteFileIndex.java -->
+ <suppress files="org[\\/]apache[\\/]iceberg[\\/]DeleteFileIndex\.java"
checks="[a-zA-Z0-9]*"/>
+
<!-- ignore gensrc/thrift/ExternalTableSchema.thrift -->
<suppress files=".*thrift/schema/external/.*" checks=".*"/>
</suppressions>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 57d80c804c0..8d08e3e8eae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -51,6 +51,12 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND =
"iceberg.table.meta.cache.ttl-second";
public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND =
"iceberg.snapshot.meta.cache.ttl-second";
+ public static final String ICEBERG_MANIFEST_CACHE_ENABLE =
"iceberg.manifest.cache.enable";
+ public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB =
"iceberg.manifest.cache.capacity-mb";
+ public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND =
"iceberg.manifest.cache.ttl-second";
+ public static final boolean DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE = true;
+ public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024;
+ public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 *
60 * 60;
protected String icebergCatalogType;
protected Catalog catalog;
@@ -95,6 +101,29 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
"The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND
+ " is wrong, value is "
+ partitionCacheTtlSecond);
}
+
+ String manifestCacheEnable =
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
+ if (Objects.nonNull(manifestCacheEnable)
+ && !(manifestCacheEnable.equalsIgnoreCase("true") ||
manifestCacheEnable.equalsIgnoreCase("false"))) {
+ throw new DdlException(
+ "The parameter " + ICEBERG_MANIFEST_CACHE_ENABLE + " is
wrong, value is "
+ + manifestCacheEnable);
+ }
+
+ String manifestCacheCapacity =
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
+ if (Objects.nonNull(manifestCacheCapacity) &&
NumberUtils.toLong(manifestCacheCapacity, -1) <= 0) {
+ throw new DdlException(
+ "The parameter " + ICEBERG_MANIFEST_CACHE_CAPACITY_MB + "
is wrong, value is "
+ + manifestCacheCapacity);
+ }
+
+ String manifestCacheTtlSecond =
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
+ if (Objects.nonNull(manifestCacheTtlSecond)
+ && NumberUtils.toLong(manifestCacheTtlSecond, CACHE_NO_TTL) <
CACHE_TTL_DISABLE_CACHE) {
+ throw new DdlException(
+ "The parameter " + ICEBERG_MANIFEST_CACHE_TTL_SECOND + "
is wrong, value is "
+ + manifestCacheTtlSecond);
+ }
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
}
@@ -106,6 +135,13 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
if (Objects.nonNull(tableMetaCacheTtl) ||
Objects.nonNull(snapshotMetaCacheTtl)) {
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
}
+ String manifestCacheEnable =
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
+ String manifestCacheCapacity =
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
+ String manifestCacheTtl =
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
+ if (Objects.nonNull(manifestCacheEnable) ||
Objects.nonNull(manifestCacheCapacity)
+ || Objects.nonNull(manifestCacheTtl)) {
+
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
+ }
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index 0d49965d44a..7acc22152c6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -58,6 +59,7 @@ public class IcebergMetadataCache {
private LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
private LoadingCache<IcebergMetadataCacheKey, IcebergSnapshotCacheValue>
snapshotCache;
private LoadingCache<IcebergMetadataCacheKey, View> viewCache;
+ private IcebergManifestCache manifestCache;
public IcebergMetadataCache(IcebergExternalCatalog catalog,
ExecutorService executor) {
this.executor = executor;
@@ -101,6 +103,15 @@ public class IcebergMetadataCache {
null);
this.snapshotCache =
snapshotCacheFactory.buildCache(this::loadSnapshot, executor);
this.viewCache = tableCacheFactory.buildCache(this::loadView,
executor);
+
+ long manifestCacheCapacityMb = NumberUtils.toLong(
+
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY_MB),
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB);
+ manifestCacheCapacityMb = Math.max(manifestCacheCapacityMb, 0L);
+ long manifestCacheTtlSec = NumberUtils.toLong(
+
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND),
+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND);
+ this.manifestCache = new IcebergManifestCache(manifestCacheCapacityMb,
manifestCacheTtlSec);
}
public Table getIcebergTable(ExternalTable dorisTable) {
@@ -117,6 +128,10 @@ public class IcebergMetadataCache {
return snapshotCache.get(key);
}
+ public IcebergManifestCache getManifestCache() {
+ return manifestCache;
+ }
+
@NotNull
private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
Table icebergTable = getIcebergTable(key);
@@ -200,6 +215,7 @@ public class IcebergMetadataCache {
viewCache.asMap().entrySet().stream()
.filter(entry -> entry.getKey().nameMapping.getCtlId() ==
catalogId)
.forEach(entry -> viewCache.invalidate(entry.getKey()));
+ manifestCache.invalidateAll();
}
public void invalidateTableCache(ExternalTable dorisTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 001a9a85903..2ee56f0a0f4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -56,6 +56,7 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccUtil;
@@ -1553,4 +1554,19 @@ public class IcebergUtils {
icebergExternalTable.getViewText();
}
+ public static IcebergManifestCache getManifestCache(ExternalCatalog
catalog) {
+ return Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getIcebergMetadataCache((IcebergExternalCatalog) catalog)
+ .getManifestCache();
+ }
+
+ public static boolean isManifestCacheEnabled(ExternalCatalog catalog) {
+ String enabled =
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE);
+ if (enabled == null) {
+ return
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE;
+ }
+ return Boolean.parseBoolean(enabled);
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
new file mode 100644
index 00000000000..112f161389b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimator.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.StructLike;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
+ */
+public final class ContentFileEstimator {
+ private static final long LIST_BASE_WEIGHT = 48L;
+ private static final long OBJECT_REFERENCE_WEIGHT = 8L;
+ private static final long CONTENT_FILE_BASE_WEIGHT = 256L;
+ private static final long STRING_BASE_WEIGHT = 40L;
+ private static final long CHAR_BYTES = 2L;
+ private static final long BYTE_BUFFER_BASE_WEIGHT = 16L;
+ private static final long MAP_BASE_WEIGHT = 48L;
+ private static final long MAP_ENTRY_OVERHEAD = 24L;
+ private static final long LONG_OBJECT_WEIGHT = 24L;
+ private static final long INT_OBJECT_WEIGHT = 16L;
+ private static final long PARTITION_BASE_WEIGHT = 48L;
+ private static final long PARTITION_VALUE_BASE_WEIGHT = 8L;
+
+ private ContentFileEstimator() {
+ }
+
+ public static long estimate(List<? extends ContentFile<?>> files) {
+ return listReferenceWeight(files) + estimateContentFilesWeight(files);
+ }
+
+ private static long listReferenceWeight(List<?> files) {
+ if (files == null || files.isEmpty()) {
+ return 0L;
+ }
+ return LIST_BASE_WEIGHT + (long) files.size() *
OBJECT_REFERENCE_WEIGHT;
+ }
+
+ private static long estimateContentFilesWeight(List<? extends
ContentFile<?>> files) {
+ long total = 0L;
+ if (files == null) {
+ return 0L;
+ }
+ for (ContentFile<?> file : files) {
+ total += estimateContentFileWeight(file);
+ }
+ return total;
+ }
+
+ private static long estimateContentFileWeight(ContentFile<?> file) {
+ if (file == null) {
+ return 0L;
+ }
+
+ long weight = CONTENT_FILE_BASE_WEIGHT;
+ weight += charSequenceWeight(file.path());
+ weight += stringWeight(file.manifestLocation());
+ weight += byteBufferWeight(file.keyMetadata());
+ weight += partitionWeight(file.partition());
+
+ weight += numericMapWeight(file.columnSizes());
+ weight += numericMapWeight(file.valueCounts());
+ weight += numericMapWeight(file.nullValueCounts());
+ weight += numericMapWeight(file.nanValueCounts());
+ weight += byteBufferMapWeight(file.lowerBounds());
+ weight += byteBufferMapWeight(file.upperBounds());
+
+ weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT);
+ weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT);
+
+ weight += optionalLongWeight(file.pos());
+ weight += optionalLongWeight(file.dataSequenceNumber());
+ weight += optionalLongWeight(file.fileSequenceNumber());
+ weight += optionalLongWeight(file.firstRowId());
+ weight += optionalIntWeight(file.sortOrderId());
+
+ if (file instanceof DeleteFile) {
+ DeleteFile deleteFile = (DeleteFile) file;
+ weight += stringWeight(deleteFile.referencedDataFile());
+ weight += optionalLongWeight(deleteFile.contentOffset());
+ weight += optionalLongWeight(deleteFile.contentSizeInBytes());
+ }
+
+ return weight;
+ }
+
+ private static long listWeight(List<? extends Number> list, long
elementWeight) {
+ if (list == null || list.isEmpty()) {
+ return 0L;
+ }
+ return LIST_BASE_WEIGHT + (long) list.size() *
(OBJECT_REFERENCE_WEIGHT + elementWeight);
+ }
+
+ private static long numericMapWeight(Map<Integer, Long> map) {
+ if (map == null || map.isEmpty()) {
+ return 0L;
+ }
+ return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD +
LONG_OBJECT_WEIGHT);
+ }
+
+ private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) {
+ if (map == null || map.isEmpty()) {
+ return 0L;
+ }
+ long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD;
+ for (ByteBuffer buffer : map.values()) {
+ weight += byteBufferWeight(buffer);
+ }
+ return weight;
+ }
+
+ private static long partitionWeight(StructLike partition) {
+ if (partition == null) {
+ return 0L;
+ }
+ long weight = PARTITION_BASE_WEIGHT + (long) partition.size() *
PARTITION_VALUE_BASE_WEIGHT;
+ for (int i = 0; i < partition.size(); i++) {
+ Object value = partition.get(i, Object.class);
+ weight += estimateValueWeight(value);
+ }
+ return weight;
+ }
+
+ private static long estimateValueWeight(Object value) {
+ if (value == null) {
+ return 0L;
+ }
+ if (value instanceof CharSequence) {
+ return charSequenceWeight((CharSequence) value);
+ } else if (value instanceof byte[]) {
+ return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length;
+ } else if (value instanceof ByteBuffer) {
+ return byteBufferWeight((ByteBuffer) value);
+ } else if (value instanceof Long || value instanceof Double) {
+ return LONG_OBJECT_WEIGHT;
+ } else if (value instanceof Integer || value instanceof Float) {
+ return INT_OBJECT_WEIGHT;
+ } else if (value instanceof Short || value instanceof Character) {
+ return 4L;
+ } else if (value instanceof Boolean) {
+ return 1L;
+ }
+ return OBJECT_REFERENCE_WEIGHT;
+ }
+
+ private static long charSequenceWeight(CharSequence value) {
+ if (value == null) {
+ return 0L;
+ }
+ return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
+ }
+
+ private static long stringWeight(String value) {
+ if (value == null) {
+ return 0L;
+ }
+ return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
+ }
+
+ private static long byteBufferWeight(ByteBuffer buffer) {
+ if (buffer == null) {
+ return 0L;
+ }
+ return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining();
+ }
+
+ private static long optionalLongWeight(Long value) {
+ return value == null ? 0L : LONG_OBJECT_WEIGHT;
+ }
+
+ private static long optionalIntWeight(Integer value) {
+ return value == null ? 0L : INT_OBJECT_WEIGHT;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
new file mode 100644
index 00000000000..6c5d79ecb69
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.doris.datasource.CacheException;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+/**
+ * A lightweight manifest cache that stores parsed DataFile/DeleteFile lists
per manifest.
+ */
+public class IcebergManifestCache {
+ private static final Logger LOG =
LogManager.getLogger(IcebergManifestCache.class);
+
+ private final LoadingCache<ManifestCacheKey, ManifestCacheValue> cache;
+
+ public IcebergManifestCache(long capacityMb, long ttlSec) {
+ long capacityInBytes = capacityMb * 1024L * 1024L;
+ Weigher<ManifestCacheKey, ManifestCacheValue> weigher = (key, value)
-> {
+ long weight =
Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L);
+ if (weight > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ return (int) weight;
+ };
+ Caffeine<ManifestCacheKey, ManifestCacheValue> builder =
Caffeine.newBuilder()
+ .maximumWeight(capacityInBytes)
+ .weigher(weigher)
+ .expireAfterAccess(Duration.ofSeconds(ttlSec));
+ cache = builder.build(new CacheLoader<ManifestCacheKey,
ManifestCacheValue>() {
+ @Override
+ public ManifestCacheValue load(ManifestCacheKey key) {
+ throw new CacheException("Manifest cache loader should be
provided explicitly for key %s", null, key);
+ }
+ });
+ }
+
+ public ManifestCacheValue get(ManifestCacheKey key,
Callable<ManifestCacheValue> loader) {
+ try {
+ return cache.get(key, ignored -> {
+ try {
+ return loader.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ throw new CacheException("Failed to load manifest cache for key
%s", e, key);
+ }
+ }
+
+ public Optional<ManifestCacheValue> peek(ManifestCacheKey key) {
+ return Optional.ofNullable(cache.getIfPresent(key));
+ }
+
+ public void invalidateByPath(String path) {
+ cache.invalidate(buildKey(path));
+ }
+
+ public void invalidateAll() {
+ cache.invalidateAll();
+ }
+
+ public ManifestCacheKey buildKey(String path) {
+ return new ManifestCacheKey(path);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
new file mode 100644
index 00000000000..8ec14a50e6a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.doris.datasource.CacheException;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Helper to load manifest content and populate the manifest cache.
+ */
+public class IcebergManifestCacheLoader {
+ private static final Logger LOG =
LogManager.getLogger(IcebergManifestCacheLoader.class);
+
+ private IcebergManifestCacheLoader() {
+ }
+
+ public static ManifestCacheValue
loadDataFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+ Table table) {
+ return loadDataFilesWithCache(cache, manifest, table, null);
+ }
+
+ public static ManifestCacheValue
loadDataFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+ Table table, Consumer<Boolean> cacheHitRecorder) {
+ return loadWithCache(cache, manifest, cacheHitRecorder, () ->
loadDataFiles(manifest, table));
+ }
+
+ public static ManifestCacheValue
loadDeleteFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+ Table table) {
+ return loadDeleteFilesWithCache(cache, manifest, table, null);
+ }
+
+ public static ManifestCacheValue
loadDeleteFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+ Table table, Consumer<Boolean> cacheHitRecorder) {
+ return loadWithCache(cache, manifest, cacheHitRecorder, () ->
loadDeleteFiles(manifest, table));
+ }
+
+ private static ManifestCacheValue loadDataFiles(ManifestFile manifest,
Table table) {
+ List<DataFile> dataFiles = Lists.newArrayList();
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest,
table.io())) {
+ // ManifestReader implements CloseableIterable<DataFile>, iterate
directly
+ for (DataFile dataFile : reader) {
+ dataFiles.add(dataFile.copy());
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to read data manifest {}", manifest.path(), e);
+ throw new CacheException("Failed to read data manifest %s", e,
manifest.path());
+ }
+ return ManifestCacheValue.forDataFiles(dataFiles);
+ }
+
+ private static ManifestCacheValue loadDeleteFiles(ManifestFile manifest,
Table table) {
+ List<DeleteFile> deleteFiles = Lists.newArrayList();
+ try (ManifestReader<DeleteFile> reader =
ManifestFiles.readDeleteManifest(manifest, table.io(),
+ table.specs())) {
+ // ManifestReader implements CloseableIterable<DeleteFile>,
iterate directly
+ for (DeleteFile deleteFile : reader) {
+ deleteFiles.add(deleteFile.copy());
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to read delete manifest {}", manifest.path(), e);
+ throw new CacheException("Failed to read delete manifest %s", e,
manifest.path());
+ }
+ return ManifestCacheValue.forDeleteFiles(deleteFiles);
+ }
+
+ private static ManifestCacheValue loadWithCache(IcebergManifestCache
cache, ManifestFile manifest,
+ Consumer<Boolean> cacheHitRecorder, Loader loader) {
+ ManifestCacheKey key = buildKey(cache, manifest);
+ Optional<ManifestCacheValue> cached = cache.peek(key);
+ boolean cacheHit = cached.isPresent();
+ if (cacheHitRecorder != null) {
+ cacheHitRecorder.accept(cacheHit);
+ }
+ if (cacheHit) {
+ return cached.get();
+ }
+ return cache.get(key, loader::load);
+ }
+
+ @FunctionalInterface
+ private interface Loader {
+ ManifestCacheValue load();
+ }
+
+ private static ManifestCacheKey buildKey(IcebergManifestCache cache,
ManifestFile manifest) {
+ // Iceberg manifest files are immutable, so path uniquely identifies a
manifest
+ return cache.buildKey(manifest.path());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
new file mode 100644
index 00000000000..41b52187aec
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import java.util.Objects;
+
+/**
+ * Cache key for a single Iceberg manifest file.
+ * Since Iceberg manifest files are immutable, path uniquely identifies a
manifest.
+ */
+public class ManifestCacheKey {
+ private final String path;
+
+ public ManifestCacheKey(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ManifestCacheKey)) {
+ return false;
+ }
+ ManifestCacheKey that = (ManifestCacheKey) o;
+ return Objects.equals(path, that.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path);
+ }
+
+ @Override
+ public String toString() {
+ return "ManifestCacheKey{path='" + path + "'}";
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
new file mode 100644
index 00000000000..91e2f6db72f
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Cached manifest payload containing parsed files and an estimated weight.
+ */
+public class ManifestCacheValue {
+ private final List<DataFile> dataFiles;
+ private final List<DeleteFile> deleteFiles;
+ private final long weightBytes;
+
+ private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile>
deleteFiles, long weightBytes) {
+ this.dataFiles = dataFiles == null ? Collections.emptyList() :
dataFiles;
+ this.deleteFiles = deleteFiles == null ? Collections.emptyList() :
deleteFiles;
+ this.weightBytes = weightBytes;
+ }
+
+ public static ManifestCacheValue forDataFiles(List<DataFile> dataFiles) {
+ return new ManifestCacheValue(dataFiles, Collections.emptyList(),
+ estimateWeight(dataFiles, Collections.emptyList()));
+ }
+
+ public static ManifestCacheValue forDeleteFiles(List<DeleteFile>
deleteFiles) {
+ return new ManifestCacheValue(Collections.emptyList(), deleteFiles,
+ estimateWeight(Collections.emptyList(), deleteFiles));
+ }
+
+ public List<DataFile> getDataFiles() {
+ return dataFiles;
+ }
+
+ public List<DeleteFile> getDeleteFiles() {
+ return deleteFiles;
+ }
+
+ public long getWeightBytes() {
+ return weightBytes;
+ }
+
+ private static long estimateWeight(List<DataFile> dataFiles,
List<DeleteFile> deleteFiles) {
+ return ContentFileEstimator.estimate(dataFiles) +
ContentFileEstimator.estimate(deleteFiles);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 1acafbde5df..29f34bcc5cc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
@@ -38,6 +39,9 @@ import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader;
+import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue;
import org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
@@ -58,18 +62,27 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.iceberg.BaseFileScanTask;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeleteFileIndex;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Conversions;
@@ -79,9 +92,12 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
@@ -115,6 +131,9 @@ public class IcebergScanNode extends FileQueryScanNode {
// get them in doInitialize() to ensure internal consistency of ScanNode
private Map<StorageProperties.Type, StorageProperties>
storagePropertiesMap;
private Map<String, String> backendStorageProperties;
+ private long manifestCacheHits;
+ private long manifestCacheMisses;
+ private long manifestCacheFailures;
// for test
@VisibleForTesting
@@ -303,6 +322,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
);
splitAssignment.finishSchedule();
+ recordManifestCacheProfile();
} catch (Exception e) {
Optional<NotSupportedException> opt =
checkNotSupportedException(e);
if (opt.isPresent()) {
@@ -359,8 +379,136 @@ public class IcebergScanNode extends FileQueryScanNode {
}
private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+ if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
+ long targetSplitSize = getRealFileSplitSize(0);
+ return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+ }
+ try {
+ return planFileScanTaskWithManifestCache(scan);
+ } catch (Exception e) {
+ manifestCacheFailures++;
+ LOG.warn("Plan with manifest cache failed, fallback to original
scan: " + e.getMessage(), e);
+ long targetSplitSize = getRealFileSplitSize(0);
+ return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+ }
+ }
+
+ private CloseableIterable<FileScanTask>
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
+ // Get the snapshot from the scan; return empty if no snapshot exists
+ Snapshot snapshot = scan.snapshot();
+ if (snapshot == null) {
+ return CloseableIterable.withNoopClose(Collections.emptyList());
+ }
+
+ // Initialize manifest cache for efficient manifest file access
+ IcebergManifestCache cache =
IcebergUtils.getManifestCache(source.getCatalog());
+
+ // Convert query conjuncts to Iceberg filter expression
+ // This combines all predicates with AND logic for partition/file
pruning
+ Expression filterExpr = conjuncts.stream()
+ .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct,
icebergTable.schema()))
+ .filter(Objects::nonNull)
+ .reduce(Expressions.alwaysTrue(), Expressions::and);
+
+ // Get all partition specs by their IDs for later use
+ Map<Integer, PartitionSpec> specsById = icebergTable.specs();
+ boolean caseSensitive = true;
+
+ // Create residual evaluators for each partition spec
+ // Residual evaluators compute the remaining filter expression after
partition pruning
+ Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>();
+ specsById.forEach((id, spec) -> residualEvaluators.put(id,
+ ResidualEvaluator.of(spec, filterExpr, caseSensitive)));
+
+ // Create metrics evaluator for file-level pruning based on column
statistics
+ InclusiveMetricsEvaluator metricsEvaluator =
+ new InclusiveMetricsEvaluator(icebergTable.schema(),
filterExpr, caseSensitive);
+
+ // ========== Phase 1: Load delete files from delete manifests
==========
+ List<DeleteFile> deleteFiles = new ArrayList<>();
+ List<ManifestFile> deleteManifests =
snapshot.deleteManifests(icebergTable.io());
+ for (ManifestFile manifest : deleteManifests) {
+ // Skip non-delete manifests
+ if (manifest.content() != ManifestContent.DELETES) {
+ continue;
+ }
+ // Get the partition spec for this manifest
+ PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+ if (spec == null) {
+ continue;
+ }
+ // Create manifest evaluator for partition-level pruning
+ ManifestEvaluator evaluator =
+ ManifestEvaluator.forPartitionFilter(filterExpr, spec,
caseSensitive);
+ // Skip manifest if it doesn't match the filter expression
(partition pruning)
+ if (!evaluator.eval(manifest)) {
+ continue;
+ }
+ // Load delete files from cache (or from storage if not cached)
+ ManifestCacheValue value =
IcebergManifestCacheLoader.loadDeleteFilesWithCache(cache, manifest,
+ icebergTable, this::recordManifestCacheAccess);
+ deleteFiles.addAll(value.getDeleteFiles());
+ }
+
+ // Build delete file index for efficient lookup of deletes applicable
to each data file
+ DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(specsById)
+ .caseSensitive(caseSensitive)
+ .build();
+
+ // ========== Phase 2: Load data files and create scan tasks ==========
+ List<FileScanTask> tasks = new ArrayList<>();
+ try (CloseableIterable<ManifestFile> dataManifests =
+
IcebergUtils.getMatchingManifest(snapshot.dataManifests(icebergTable.io()),
+ specsById, filterExpr)) {
+ for (ManifestFile manifest : dataManifests) {
+ // Skip non-data manifests
+ if (manifest.content() != ManifestContent.DATA) {
+ continue;
+ }
+ // Get the partition spec for this manifest
+ PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+ if (spec == null) {
+ continue;
+ }
+ // Get the residual evaluator for this partition spec
+ ResidualEvaluator residualEvaluator =
residualEvaluators.get(manifest.partitionSpecId());
+ if (residualEvaluator == null) {
+ continue;
+ }
+
+ // Load data files from cache (or from storage if not cached)
+ ManifestCacheValue value =
IcebergManifestCacheLoader.loadDataFilesWithCache(cache, manifest,
+ icebergTable, this::recordManifestCacheAccess);
+
+ // Process each data file in the manifest
+ for (org.apache.iceberg.DataFile dataFile :
value.getDataFiles()) {
+ // Skip file if column statistics indicate no matching
rows (metrics-based pruning)
+ if (metricsEvaluator != null &&
!metricsEvaluator.eval(dataFile)) {
+ continue;
+ }
+ // Skip file if partition values don't match the residual
filter
+ if
(residualEvaluator.residualFor(dataFile.partition()).equals(Expressions.alwaysFalse()))
{
+ continue;
+ }
+ // Find all delete files that apply to this data file
based on sequence number
+ List<DeleteFile> deletes = Arrays.asList(
+
deleteIndex.forDataFile(dataFile.dataSequenceNumber(), dataFile));
+
+ // Create a FileScanTask containing the data file,
associated deletes, and metadata
+ tasks.add(new BaseFileScanTask(
+ dataFile,
+ deletes.toArray(new DeleteFile[0]),
+ SchemaParser.toJson(icebergTable.schema()),
+ PartitionSpecParser.toJson(spec),
+ residualEvaluator));
+ }
+ }
+ }
+
+ // Split tasks into smaller chunks based on target split size for
parallel processing
long targetSplitSize = getRealFileSplitSize(0);
- return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+ return
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
targetSplitSize);
}
private Split createIcebergSplit(FileScanTask fileScanTask) {
@@ -418,6 +566,7 @@ public class IcebergScanNode extends FileQueryScanNode {
splits.add(createIcebergSplit(task));
}
selectedPartitionNum = partitionMapInfos.size();
+ recordManifestCacheProfile();
return splits;
}
@@ -436,6 +585,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
setPushDownCount(countFromSnapshot);
assignCountToSplits(splits, countFromSnapshot);
+ recordManifestCacheProfile();
return splits;
} else {
fileScanTasks.forEach(taskGrp ->
splits.add(createIcebergSplit(taskGrp)));
@@ -445,6 +595,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
selectedPartitionNum = partitionMapInfos.size();
+ recordManifestCacheProfile();
return splits;
}
@@ -563,6 +714,27 @@ public class IcebergScanNode extends FileQueryScanNode {
return new ArrayList<>();
}
+ private void recordManifestCacheAccess(boolean cacheHit) {
+ if (cacheHit) {
+ manifestCacheHits++;
+ } else {
+ manifestCacheMisses++;
+ }
+ }
+
+ private void recordManifestCacheProfile() {
+ if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
+ return;
+ }
+ SummaryProfile summaryProfile =
SummaryProfile.getSummaryProfile(ConnectContext.get());
+ if (summaryProfile == null || summaryProfile.getExecutionSummary() ==
null) {
+ return;
+ }
+ summaryProfile.getExecutionSummary().addInfoString("Manifest Cache",
+ String.format("hits=%d, misses=%d, failures=%d",
+ manifestCacheHits, manifestCacheMisses,
manifestCacheFailures));
+ }
+
@Override
public TableIf getTargetTable() {
return source.getTargetTable();
@@ -612,15 +784,23 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
- if (pushdownIcebergPredicates.isEmpty()) {
- return super.getNodeExplainString(prefix, detailLevel);
+ String base = super.getNodeExplainString(prefix, detailLevel);
+ StringBuilder builder = new StringBuilder(base);
+
+ if (detailLevel == TExplainLevel.VERBOSE &&
IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
+ builder.append(prefix).append("manifest cache:
hits=").append(manifestCacheHits)
+ .append(", misses=").append(manifestCacheMisses)
+ .append(",
failures=").append(manifestCacheFailures).append("\n");
}
- StringBuilder sb = new StringBuilder();
- for (String predicate : pushdownIcebergPredicates) {
- sb.append(prefix).append(prefix).append(predicate).append("\n");
+
+ if (!pushdownIcebergPredicates.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ for (String predicate : pushdownIcebergPredicates) {
+
sb.append(prefix).append(prefix).append(predicate).append("\n");
+ }
+ builder.append(String.format("%sicebergPredicatePushdown=\n%s\n",
prefix, sb));
}
- return super.getNodeExplainString(prefix, detailLevel)
- + String.format("%sicebergPredicatePushdown=\n%s\n", prefix,
sb);
+ return builder.toString();
}
private void assignCountToSplits(List<Split> splits, long totalCount) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
index 2cc829c8743..88def12d2a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
@@ -43,6 +43,43 @@ public abstract class AbstractIcebergProperties extends
MetastoreProperties {
)
protected String warehouse;
+ @Getter
+ @ConnectorProperty(
+ names = {CatalogProperties.IO_MANIFEST_CACHE_ENABLED},
+ required = false,
+ description = "Controls whether to use caching during manifest
reads or not. Default: false."
+ )
+ protected String ioManifestCacheEnabled;
+
+ @Getter
+ @ConnectorProperty(
+ names =
{CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS},
+ required = false,
+ description = "Controls the maximum duration for which an entry
stays in the manifest cache. "
+ + "Must be a non-negative value. Zero means entries expire
only due to memory pressure. "
+ + "Default: 60000 (60s)."
+ )
+ protected String ioManifestCacheExpirationIntervalMs;
+
+ @Getter
+ @ConnectorProperty(
+ names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES},
+ required = false,
+ description = "Controls the maximum total amount of bytes to cache
in manifest cache. "
+ + "Must be a positive value. Default: 104857600 (100MB)."
+ )
+ protected String ioManifestCacheMaxTotalBytes;
+
+ @Getter
+ @ConnectorProperty(
+ names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH},
+ required = false,
+ description = "Controls the maximum length of file to be
considered for caching. "
+ + "An InputFile will not be cached if the length is longer
than this limit. "
+ + "Must be a positive value. Default: 8388608 (8MB)."
+ )
+ protected String ioManifestCacheMaxContentLength;
+
@Getter
protected ExecutionAuthenticator executionAuthenticator = new
ExecutionAuthenticator(){};
@@ -80,6 +117,9 @@ public abstract class AbstractIcebergProperties extends
MetastoreProperties {
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
}
+ // Add manifest cache properties if configured
+ addManifestCacheProperties(catalogProps);
+
Catalog catalog = initCatalog(catalogName, catalogProps,
storagePropertiesList);
if (catalog == null) {
@@ -88,6 +128,28 @@ public abstract class AbstractIcebergProperties extends
MetastoreProperties {
return catalog;
}
+ /**
+ * Add manifest cache related properties to catalog properties.
+ * These properties control caching behavior during manifest reads.
+ *
+ * @param catalogProps the catalog properties map to add manifest cache
properties to
+ */
+ protected void addManifestCacheProperties(Map<String, String>
catalogProps) {
+ if (StringUtils.isNotBlank(ioManifestCacheEnabled)) {
+ catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
ioManifestCacheEnabled);
+ }
+ if (StringUtils.isNotBlank(ioManifestCacheExpirationIntervalMs)) {
+
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
+ ioManifestCacheExpirationIntervalMs);
+ }
+ if (StringUtils.isNotBlank(ioManifestCacheMaxTotalBytes)) {
+
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
ioManifestCacheMaxTotalBytes);
+ }
+ if (StringUtils.isNotBlank(ioManifestCacheMaxContentLength)) {
+
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
ioManifestCacheMaxContentLength);
+ }
+ }
+
/**
* Subclasses must implement this to create the concrete Catalog instance.
*/
diff --git a/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
new file mode 100644
index 00000000000..36cf36b556d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -0,0 +1,906 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.iceberg.util.PartitionSet;
+import org.apache.iceberg.util.Tasks;
+
+/**
+ * An index of {@link DeleteFile delete files} by sequence number.
+ *
+ * <p>Use {@link #builderFor(FileIO, Iterable)} to construct an index, and
{@link #forDataFile(long,
+ * DataFile)} or {@link #forEntry(ManifestEntry)} to get the delete files to
apply to a given data
+ * file.
+ *
+ * Copied from
https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+ * Change DeleteFileIndex and some methods to public.
+ */
+public class DeleteFileIndex {
+ private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0];
+
+ private final EqualityDeletes globalDeletes;
+ private final PartitionMap<EqualityDeletes> eqDeletesByPartition;
+ private final PartitionMap<PositionDeletes> posDeletesByPartition;
+ private final Map<String, PositionDeletes> posDeletesByPath;
+ private final Map<String, DeleteFile> dvByPath;
+ private final boolean hasEqDeletes;
+ private final boolean hasPosDeletes;
+ private final boolean isEmpty;
+
+ private DeleteFileIndex(
+ EqualityDeletes globalDeletes,
+ PartitionMap<EqualityDeletes> eqDeletesByPartition,
+ PartitionMap<PositionDeletes> posDeletesByPartition,
+ Map<String, PositionDeletes> posDeletesByPath,
+ Map<String, DeleteFile> dvByPath) {
+ this.globalDeletes = globalDeletes;
+ this.eqDeletesByPartition = eqDeletesByPartition;
+ this.posDeletesByPartition = posDeletesByPartition;
+ this.posDeletesByPath = posDeletesByPath;
+ this.dvByPath = dvByPath;
+ this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null;
+ this.hasPosDeletes =
+ posDeletesByPartition != null || posDeletesByPath != null || dvByPath
!= null;
+ this.isEmpty = !hasEqDeletes && !hasPosDeletes;
+ }
+
+ public boolean isEmpty() {
+ return isEmpty;
+ }
+
+ public boolean hasEqualityDeletes() {
+ return hasEqDeletes;
+ }
+
+ public boolean hasPositionDeletes() {
+ return hasPosDeletes;
+ }
+
+ public Iterable<DeleteFile> referencedDeleteFiles() {
+ Iterable<DeleteFile> deleteFiles = Collections.emptyList();
+
+ if (globalDeletes != null) {
+ deleteFiles = Iterables.concat(deleteFiles,
globalDeletes.referencedDeleteFiles());
+ }
+
+ if (eqDeletesByPartition != null) {
+ for (EqualityDeletes deletes : eqDeletesByPartition.values()) {
+ deleteFiles = Iterables.concat(deleteFiles,
deletes.referencedDeleteFiles());
+ }
+ }
+
+ if (posDeletesByPartition != null) {
+ for (PositionDeletes deletes : posDeletesByPartition.values()) {
+ deleteFiles = Iterables.concat(deleteFiles,
deletes.referencedDeleteFiles());
+ }
+ }
+
+ if (posDeletesByPath != null) {
+ for (PositionDeletes deletes : posDeletesByPath.values()) {
+ deleteFiles = Iterables.concat(deleteFiles,
deletes.referencedDeleteFiles());
+ }
+ }
+
+ if (dvByPath != null) {
+ deleteFiles = Iterables.concat(deleteFiles, dvByPath.values());
+ }
+
+ return deleteFiles;
+ }
+
+ DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
+ return forDataFile(entry.dataSequenceNumber(), entry.file());
+ }
+
+ public DeleteFile[] forDataFile(DataFile file) {
+ return forDataFile(file.dataSequenceNumber(), file);
+ }
+
+ public DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
+ if (isEmpty) {
+ return EMPTY_DELETES;
+ }
+
+ DeleteFile[] global = findGlobalDeletes(sequenceNumber, file);
+ DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file);
+ DeleteFile dv = findDV(sequenceNumber, file);
+ if (dv != null && global == null && eqPartition == null) {
+ return new DeleteFile[] {dv};
+ } else if (dv != null) {
+ return concat(global, eqPartition, new DeleteFile[] {dv});
+ } else {
+ DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber,
file);
+ DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
+ return concat(global, eqPartition, posPartition, posPath);
+ }
+ }
+
+ private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) {
+ return globalDeletes == null ? EMPTY_DELETES : globalDeletes.filter(seq,
dataFile);
+ }
+
+ private DeleteFile[] findPosPartitionDeletes(long seq, DataFile dataFile) {
+ if (posDeletesByPartition == null) {
+ return EMPTY_DELETES;
+ }
+
+ PositionDeletes deletes = posDeletesByPartition.get(dataFile.specId(),
dataFile.partition());
+ return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
+ }
+
+ private DeleteFile[] findEqPartitionDeletes(long seq, DataFile dataFile) {
+ if (eqDeletesByPartition == null) {
+ return EMPTY_DELETES;
+ }
+
+ EqualityDeletes deletes = eqDeletesByPartition.get(dataFile.specId(),
dataFile.partition());
+ return deletes == null ? EMPTY_DELETES : deletes.filter(seq, dataFile);
+ }
+
+ @SuppressWarnings("CollectionUndefinedEquality")
+ private DeleteFile[] findPathDeletes(long seq, DataFile dataFile) {
+ if (posDeletesByPath == null) {
+ return EMPTY_DELETES;
+ }
+
+ PositionDeletes deletes = posDeletesByPath.get(dataFile.location());
+ return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
+ }
+
+ private DeleteFile findDV(long seq, DataFile dataFile) {
+ if (dvByPath == null) {
+ return null;
+ }
+
+ DeleteFile dv = dvByPath.get(dataFile.location());
+ if (dv != null) {
+ ValidationException.check(
+ dv.dataSequenceNumber() >= seq,
+ "DV data sequence number (%s) must be greater than or equal to data
file sequence number (%s)",
+ dv.dataSequenceNumber(),
+ seq);
+ }
+ return dv;
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private static boolean canContainEqDeletesForFile(
+ DataFile dataFile, EqualityDeleteFile deleteFile) {
+ Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
+ Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
+
+ // whether to check data ranges or to assume that the ranges match
+ // if upper/lower bounds are missing, null counts may still be used to
determine delete files
+ // can be skipped
+ boolean checkRanges =
+ dataLowers != null && dataUppers != null &&
deleteFile.hasLowerAndUpperBounds();
+
+ Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
+ Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
+ Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
+ Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();
+
+ for (Types.NestedField field : deleteFile.equalityFields()) {
+ if (!field.type().isPrimitiveType()) {
+ // stats are not kept for nested types. assume that the delete file
may match
+ continue;
+ }
+
+ if (containsNull(dataNullCounts, field) &&
containsNull(deleteNullCounts, field)) {
+ // the data has null values and null has been deleted, so the deletes
must be applied
+ continue;
+ }
+
+ if (allNull(dataNullCounts, dataValueCounts, field) &&
allNonNull(deleteNullCounts, field)) {
+ // the data file contains only null values for this field, but there
are no deletes for null
+ // values
+ return false;
+ }
+
+ if (allNull(deleteNullCounts, deleteValueCounts, field)
+ && allNonNull(dataNullCounts, field)) {
+ // the delete file removes only null rows with null for this field,
but there are no data
+ // rows with null
+ return false;
+ }
+
+ if (!checkRanges) {
+ // some upper and lower bounds are missing, assume they match
+ continue;
+ }
+
+ int id = field.fieldId();
+ ByteBuffer dataLower = dataLowers.get(id);
+ ByteBuffer dataUpper = dataUppers.get(id);
+ Object deleteLower = deleteFile.lowerBound(id);
+ Object deleteUpper = deleteFile.upperBound(id);
+ if (dataLower == null || dataUpper == null || deleteLower == null ||
deleteUpper == null) {
+ // at least one bound is not known, assume the delete file may match
+ continue;
+ }
+
+ if (!rangesOverlap(field, dataLower, dataUpper, deleteLower,
deleteUpper)) {
+ // no values overlap between the data file and the deletes
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private static <T> boolean rangesOverlap(
+ Types.NestedField field,
+ ByteBuffer dataLowerBuf,
+ ByteBuffer dataUpperBuf,
+ T deleteLower,
+ T deleteUpper) {
+ Type.PrimitiveType type = field.type().asPrimitiveType();
+ Comparator<T> comparator = Comparators.forType(type);
+
+ T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf);
+ if (comparator.compare(dataLower, deleteUpper) > 0) {
+ return false;
+ }
+
+ T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf);
+ if (comparator.compare(deleteLower, dataUpper) > 0) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private static boolean allNonNull(Map<Integer, Long> nullValueCounts,
Types.NestedField field) {
+ if (field.isRequired()) {
+ return true;
+ }
+
+ if (nullValueCounts == null) {
+ return false;
+ }
+
+ Long nullValueCount = nullValueCounts.get(field.fieldId());
+ if (nullValueCount == null) {
+ return false;
+ }
+
+ return nullValueCount <= 0;
+ }
+
+ private static boolean allNull(
+ Map<Integer, Long> nullValueCounts, Map<Integer, Long> valueCounts,
Types.NestedField field) {
+ if (field.isRequired()) {
+ return false;
+ }
+
+ if (nullValueCounts == null || valueCounts == null) {
+ return false;
+ }
+
+ Long nullValueCount = nullValueCounts.get(field.fieldId());
+ Long valueCount = valueCounts.get(field.fieldId());
+ if (nullValueCount == null || valueCount == null) {
+ return false;
+ }
+
+ return nullValueCount.equals(valueCount);
+ }
+
+ private static boolean containsNull(Map<Integer, Long> nullValueCounts,
Types.NestedField field) {
+ if (field.isRequired()) {
+ return false;
+ }
+
+ if (nullValueCounts == null) {
+ return true;
+ }
+
+ Long nullValueCount = nullValueCounts.get(field.fieldId());
+ if (nullValueCount == null) {
+ return true;
+ }
+
+ return nullValueCount > 0;
+ }
+
+ static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests)
{
+ return new Builder(io, Sets.newHashSet(deleteManifests));
+ }
+
+ // changed to public method.
+ public static Builder builderFor(Iterable<DeleteFile> deleteFiles) {
+ return new Builder(deleteFiles);
+ }
+
+ // changed to public class.
+ public static class Builder {
+ private final FileIO io;
+ private final Set<ManifestFile> deleteManifests;
+ private final Iterable<DeleteFile> deleteFiles;
+ private long minSequenceNumber = 0L;
+ private Map<Integer, PartitionSpec> specsById = null;
+ private Expression dataFilter = Expressions.alwaysTrue();
+ private Expression partitionFilter = Expressions.alwaysTrue();
+ private PartitionSet partitionSet = null;
+ private boolean caseSensitive = true;
+ private ExecutorService executorService = null;
+ private ScanMetrics scanMetrics = ScanMetrics.noop();
+ private boolean ignoreResiduals = false;
+
+ Builder(FileIO io, Set<ManifestFile> deleteManifests) {
+ this.io = io;
+ this.deleteManifests = Sets.newHashSet(deleteManifests);
+ this.deleteFiles = null;
+ }
+
+ Builder(Iterable<DeleteFile> deleteFiles) {
+ this.io = null;
+ this.deleteManifests = null;
+ this.deleteFiles = deleteFiles;
+ }
+
+ Builder afterSequenceNumber(long seq) {
+ this.minSequenceNumber = seq;
+ return this;
+ }
+
+ public Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
+ this.specsById = newSpecsById;
+ return this;
+ }
+
+ Builder filterData(Expression newDataFilter) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
data filters");
+ this.dataFilter = Expressions.and(dataFilter, newDataFilter);
+ return this;
+ }
+
+ Builder filterPartitions(Expression newPartitionFilter) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
partition filters");
+ this.partitionFilter = Expressions.and(partitionFilter,
newPartitionFilter);
+ return this;
+ }
+
+ Builder filterPartitions(PartitionSet newPartitionSet) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
partition filters");
+ this.partitionSet = newPartitionSet;
+ return this;
+ }
+
+ public Builder caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
+ Builder planWith(ExecutorService newExecutorService) {
+ this.executorService = newExecutorService;
+ return this;
+ }
+
+ Builder scanMetrics(ScanMetrics newScanMetrics) {
+ this.scanMetrics = newScanMetrics;
+ return this;
+ }
+
+ Builder ignoreResiduals() {
+ this.ignoreResiduals = true;
+ return this;
+ }
+
+ private Iterable<DeleteFile> filterDeleteFiles() {
+ return Iterables.filter(deleteFiles, file -> file.dataSequenceNumber() >
minSequenceNumber);
+ }
+
+ private Collection<DeleteFile> loadDeleteFiles() {
+ // read all of the matching delete manifests in parallel and accumulate
the matching files in
+ // a queue
+ Queue<DeleteFile> files = new ConcurrentLinkedQueue<>();
+ Tasks.foreach(deleteManifestReaders())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(executorService)
+ .run(
+ deleteFile -> {
+ try (CloseableIterable<ManifestEntry<DeleteFile>> reader =
deleteFile) {
+ for (ManifestEntry<DeleteFile> entry : reader) {
+ if (entry.dataSequenceNumber() > minSequenceNumber) {
+ // copy with stats for better filtering against data
file stats
+ files.add(entry.file().copy());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close");
+ }
+ });
+ return files;
+ }
+
+ public DeleteFileIndex build() {
+ Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() :
loadDeleteFiles();
+
+ EqualityDeletes globalDeletes = new EqualityDeletes();
+ PartitionMap<EqualityDeletes> eqDeletesByPartition =
PartitionMap.create(specsById);
+ PartitionMap<PositionDeletes> posDeletesByPartition =
PartitionMap.create(specsById);
+ Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap();
+ Map<String, DeleteFile> dvByPath = Maps.newHashMap();
+
+ for (DeleteFile file : files) {
+ switch (file.content()) {
+ case POSITION_DELETES:
+ if (ContentFileUtil.isDV(file)) {
+ add(dvByPath, file);
+ } else {
+ add(posDeletesByPath, posDeletesByPartition, file);
+ }
+ break;
+ case EQUALITY_DELETES:
+ add(globalDeletes, eqDeletesByPartition, file);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported content: " +
file.content());
+ }
+ ScanMetricsUtil.indexedDeleteFile(scanMetrics, file);
+ }
+
+ return new DeleteFileIndex(
+ globalDeletes.isEmpty() ? null : globalDeletes,
+ eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition,
+ posDeletesByPartition.isEmpty() ? null : posDeletesByPartition,
+ posDeletesByPath.isEmpty() ? null : posDeletesByPath,
+ dvByPath.isEmpty() ? null : dvByPath);
+ }
+
+ private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) {
+ String path = dv.referencedDataFile();
+ DeleteFile existingDV = dvByPath.putIfAbsent(path, dv);
+ if (existingDV != null) {
+ throw new ValidationException(
+ "Can't index multiple DVs for %s: %s and %s",
+ path, ContentFileUtil.dvDesc(dv),
ContentFileUtil.dvDesc(existingDV));
+ }
+ }
+
+ private void add(
+ Map<String, PositionDeletes> deletesByPath,
+ PartitionMap<PositionDeletes> deletesByPartition,
+ DeleteFile file) {
+ String path = ContentFileUtil.referencedDataFileLocation(file);
+
+ PositionDeletes deletes;
+ if (path != null) {
+ deletes = deletesByPath.computeIfAbsent(path, ignored -> new
PositionDeletes());
+ } else {
+ int specId = file.specId();
+ StructLike partition = file.partition();
+ deletes = deletesByPartition.computeIfAbsent(specId, partition,
PositionDeletes::new);
+ }
+
+ deletes.add(file);
+ }
+
+ private void add(
+ EqualityDeletes globalDeletes,
+ PartitionMap<EqualityDeletes> deletesByPartition,
+ DeleteFile file) {
+ PartitionSpec spec = specsById.get(file.specId());
+
+ EqualityDeletes deletes;
+ if (spec.isUnpartitioned()) {
+ deletes = globalDeletes;
+ } else {
+ int specId = spec.specId();
+ StructLike partition = file.partition();
+ deletes = deletesByPartition.computeIfAbsent(specId, partition,
EqualityDeletes::new);
+ }
+
+ deletes.add(spec, file);
+ }
+
+ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>>
deleteManifestReaders() {
+ Expression entryFilter = ignoreResiduals ? Expressions.alwaysTrue() :
dataFilter;
+
+ LoadingCache<Integer, Expression> partExprCache =
+ specsById == null
+ ? null
+ : Caffeine.newBuilder()
+ .build(
+ specId -> {
+ PartitionSpec spec = specsById.get(specId);
+ return Projections.inclusive(spec,
caseSensitive).project(dataFilter);
+ });
+
+ LoadingCache<Integer, ManifestEvaluator> evalCache =
+ specsById == null
+ ? null
+ : Caffeine.newBuilder()
+ .build(
+ specId -> {
+ PartitionSpec spec = specsById.get(specId);
+ return ManifestEvaluator.forPartitionFilter(
+ Expressions.and(partitionFilter,
partExprCache.get(specId)),
+ spec,
+ caseSensitive);
+ });
+
+ CloseableIterable<ManifestFile> closeableDeleteManifests =
+ CloseableIterable.withNoopClose(deleteManifests);
+ CloseableIterable<ManifestFile> matchingManifests =
+ evalCache == null
+ ? closeableDeleteManifests
+ : CloseableIterable.filter(
+ scanMetrics.skippedDeleteManifests(),
+ closeableDeleteManifests,
+ manifest ->
+ manifest.content() == ManifestContent.DELETES
+ && (manifest.hasAddedFiles() ||
manifest.hasExistingFiles())
+ &&
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+ matchingManifests =
+ CloseableIterable.count(scanMetrics.scannedDeleteManifests(),
matchingManifests);
+ return Iterables.transform(
+ matchingManifests,
+ manifest ->
+ ManifestFiles.readDeleteManifest(manifest, io, specsById)
+ .filterRows(entryFilter)
+ .filterPartitions(
+ Expressions.and(
+ partitionFilter,
partExprCache.get(manifest.partitionSpecId())))
+ .filterPartitions(partitionSet)
+ .caseSensitive(caseSensitive)
+ .scanMetrics(scanMetrics)
+ .liveEntries());
+ }
+ }
+
+ /**
+ * Finds an index in the sorted array of sequence numbers where the given
sequence number should
+ * be inserted or is found.
+ *
+ * <p>If the sequence number is present in the array, this method returns
the index of the first
+ * occurrence of the sequence number. If the sequence number is not present,
the method returns
+ * the index where the sequence number would be inserted while maintaining
the sorted order of the
+ * array. This returned index ranges from 0 (inclusive) to the length of the
array (inclusive).
+ *
+ * <p>This method is used to determine the subset of delete files that apply
to a given data file.
+ *
+ * @param seqs an array of sequence numbers sorted in ascending order
+ * @param seq the sequence number to search for
+ * @return the index of the first occurrence or the insertion point
+ */
+ private static int findStartIndex(long[] seqs, long seq) {
+ int pos = Arrays.binarySearch(seqs, seq);
+ int start;
+ if (pos < 0) {
+ // the sequence number was not found, where it would be inserted is
-(pos + 1)
+ start = -(pos + 1);
+ } else {
+ // the sequence number was found, but may not be the first
+ // find the first delete file with the given sequence number by
decrementing the position
+ start = pos;
+ while (start > 0 && seqs[start - 1] >= seq) {
+ start -= 1;
+ }
+ }
+
+ return start;
+ }
+
+ private static DeleteFile[] concat(DeleteFile[]... deletes) {
+ return ArrayUtil.concat(DeleteFile.class, deletes);
+ }
+
+ // a group of position delete files sorted by the sequence number they apply
to
+ static class PositionDeletes {
+ private static final Comparator<DeleteFile> SEQ_COMPARATOR =
+ Comparator.comparingLong(DeleteFile::dataSequenceNumber);
+
+ // indexed state
+ private long[] seqs = null;
+ private DeleteFile[] files = null;
+
+ // a buffer that is used to hold files before indexing
+ private volatile List<DeleteFile> buffer = Lists.newArrayList();
+
+ public void add(DeleteFile file) {
+ Preconditions.checkState(buffer != null, "Can't add files upon
indexing");
+ buffer.add(file);
+ }
+
+ public DeleteFile[] filter(long seq) {
+ indexIfNeeded();
+
+ int start = findStartIndex(seqs, seq);
+
+ if (start >= files.length) {
+ return EMPTY_DELETES;
+ }
+
+ if (start == 0) {
+ return files;
+ }
+
+ int matchingFilesCount = files.length - start;
+ DeleteFile[] matchingFiles = new DeleteFile[matchingFilesCount];
+ System.arraycopy(files, start, matchingFiles, 0, matchingFilesCount);
+ return matchingFiles;
+ }
+
+ public Iterable<DeleteFile> referencedDeleteFiles() {
+ indexIfNeeded();
+ return Arrays.asList(files);
+ }
+
+ public boolean isEmpty() {
+ indexIfNeeded();
+ return files.length == 0;
+ }
+
+ private void indexIfNeeded() {
+ if (buffer != null) {
+ synchronized (this) {
+ if (buffer != null) {
+ this.files = indexFiles(buffer);
+ this.seqs = indexSeqs(files);
+ this.buffer = null;
+ }
+ }
+ }
+ }
+
+ private static DeleteFile[] indexFiles(List<DeleteFile> list) {
+ DeleteFile[] array = list.toArray(EMPTY_DELETES);
+ Arrays.sort(array, SEQ_COMPARATOR);
+ return array;
+ }
+
+ private static long[] indexSeqs(DeleteFile[] files) {
+ long[] seqs = new long[files.length];
+
+ for (int index = 0; index < files.length; index++) {
+ seqs[index] = files[index].dataSequenceNumber();
+ }
+
+ return seqs;
+ }
+ }
+
+ // a group of equality delete files sorted by the sequence number they apply
to
+ static class EqualityDeletes {
+ private static final Comparator<EqualityDeleteFile> SEQ_COMPARATOR =
+ Comparator.comparingLong(EqualityDeleteFile::applySequenceNumber);
+ private static final EqualityDeleteFile[] EMPTY_EQUALITY_DELETES = new
EqualityDeleteFile[0];
+
+ // indexed state
+ private long[] seqs = null;
+ private EqualityDeleteFile[] files = null;
+
+ // a buffer that is used to hold files before indexing
+ private volatile List<EqualityDeleteFile> buffer = Lists.newArrayList();
+
+ public void add(PartitionSpec spec, DeleteFile file) {
+ Preconditions.checkState(buffer != null, "Can't add files upon
indexing");
+ buffer.add(new EqualityDeleteFile(spec, file));
+ }
+
+ public DeleteFile[] filter(long seq, DataFile dataFile) {
+ indexIfNeeded();
+
+ int start = findStartIndex(seqs, seq);
+
+ if (start >= files.length) {
+ return EMPTY_DELETES;
+ }
+
+ List<DeleteFile> matchingFiles = Lists.newArrayList();
+
+ for (int index = start; index < files.length; index++) {
+ EqualityDeleteFile file = files[index];
+ if (canContainEqDeletesForFile(dataFile, file)) {
+ matchingFiles.add(file.wrapped());
+ }
+ }
+
+ return matchingFiles.toArray(EMPTY_DELETES);
+ }
+
+ public Iterable<DeleteFile> referencedDeleteFiles() {
+ indexIfNeeded();
+ return Iterables.transform(Arrays.asList(files),
EqualityDeleteFile::wrapped);
+ }
+
+ public boolean isEmpty() {
+ indexIfNeeded();
+ return files.length == 0;
+ }
+
+ private void indexIfNeeded() {
+ if (buffer != null) {
+ synchronized (this) {
+ if (buffer != null) {
+ this.files = indexFiles(buffer);
+ this.seqs = indexSeqs(files);
+ this.buffer = null;
+ }
+ }
+ }
+ }
+
+ private static EqualityDeleteFile[] indexFiles(List<EqualityDeleteFile>
list) {
+ EqualityDeleteFile[] array = list.toArray(EMPTY_EQUALITY_DELETES);
+ Arrays.sort(array, SEQ_COMPARATOR);
+ return array;
+ }
+
+ private static long[] indexSeqs(EqualityDeleteFile[] files) {
+ long[] seqs = new long[files.length];
+
+ for (int index = 0; index < files.length; index++) {
+ seqs[index] = files[index].applySequenceNumber();
+ }
+
+ return seqs;
+ }
+ }
+
+ // an equality delete file wrapper that caches the converted boundaries for
faster boundary checks
+ // this class is not meant to be exposed beyond the delete file index
+ private static class EqualityDeleteFile {
+ private final PartitionSpec spec;
+ private final DeleteFile wrapped;
+ private final long applySequenceNumber;
+ private volatile List<Types.NestedField> equalityFields = null;
+ private volatile Map<Integer, Object> convertedLowerBounds = null;
+ private volatile Map<Integer, Object> convertedUpperBounds = null;
+
+ EqualityDeleteFile(PartitionSpec spec, DeleteFile file) {
+ this.spec = spec;
+ this.wrapped = file;
+ this.applySequenceNumber = wrapped.dataSequenceNumber() - 1;
+ }
+
+ public DeleteFile wrapped() {
+ return wrapped;
+ }
+
+ public long applySequenceNumber() {
+ return applySequenceNumber;
+ }
+
+ public List<Types.NestedField> equalityFields() {
+ if (equalityFields == null) {
+ synchronized (this) {
+ if (equalityFields == null) {
+ List<Types.NestedField> fields = Lists.newArrayList();
+ for (int id : wrapped.equalityFieldIds()) {
+ Types.NestedField field = spec.schema().findField(id);
+ fields.add(field);
+ }
+ this.equalityFields = fields;
+ }
+ }
+ }
+
+ return equalityFields;
+ }
+
+ public Map<Integer, Long> valueCounts() {
+ return wrapped.valueCounts();
+ }
+
+ public Map<Integer, Long> nullValueCounts() {
+ return wrapped.nullValueCounts();
+ }
+
+ public boolean hasLowerAndUpperBounds() {
+ return wrapped.lowerBounds() != null && wrapped.upperBounds() != null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T lowerBound(int id) {
+ return (T) lowerBounds().get(id);
+ }
+
+ private Map<Integer, Object> lowerBounds() {
+ if (convertedLowerBounds == null) {
+ synchronized (this) {
+ if (convertedLowerBounds == null) {
+ this.convertedLowerBounds = convertBounds(wrapped.lowerBounds());
+ }
+ }
+ }
+
+ return convertedLowerBounds;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T upperBound(int id) {
+ return (T) upperBounds().get(id);
+ }
+
+ private Map<Integer, Object> upperBounds() {
+ if (convertedUpperBounds == null) {
+ synchronized (this) {
+ if (convertedUpperBounds == null) {
+ this.convertedUpperBounds = convertBounds(wrapped.upperBounds());
+ }
+ }
+ }
+
+ return convertedUpperBounds;
+ }
+
+ private Map<Integer, Object> convertBounds(Map<Integer, ByteBuffer>
bounds) {
+ Map<Integer, Object> converted = Maps.newHashMap();
+
+ if (bounds != null) {
+ for (Types.NestedField field : equalityFields()) {
+ int id = field.fieldId();
+ Type type = spec.schema().findField(id).type();
+ if (type.isPrimitiveType()) {
+ ByteBuffer bound = bounds.get(id);
+ if (bound != null) {
+ converted.put(id, Conversions.fromByteBuffer(type, bound));
+ }
+ }
+ }
+ }
+
+ return converted;
+ }
+ }
+}
diff --git
a/regression-test/data/external_table_p0/iceberg/test_iceberg_manifest_cache.out
b/regression-test/data/external_table_p0/iceberg/test_iceberg_manifest_cache.out
new file mode 100644
index 00000000000..4be8149ebaa
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/test_iceberg_manifest_cache.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !desc_with_cache --
+id int Yes true \N
+ts datetime(6) Yes true \N
+
+-- !select_with_cache --
+1 2024-05-30T20:34:56
+2 2024-05-30T20:34:56.100
+3 2024-05-30T20:34:56.120
+4 2024-05-30T20:34:56.123
+
+-- !desc_without_cache --
+id int Yes true \N
+ts datetime(6) Yes true \N
+
+-- !select_without_cache --
+1 2024-05-30T20:34:56
+2 2024-05-30T20:34:56.100
+3 2024-05-30T20:34:56.120
+4 2024-05-30T20:34:56.123
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
new file mode 100644
index 00000000000..d95f05ae76c
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_manifest_cache.groovy
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_manifest_cache",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String restPort =
context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minioPort =
context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalogNameWithCache = "test_iceberg_manifest_cache_enabled"
+ String catalogNameWithoutCache = "test_iceberg_manifest_cache_disabled"
+ String tableName = "tb_ts_filter"
+
+ try {
+ sql """set enable_external_table_batch_mode=false"""
+
+ // Create catalog with manifest cache enabled
+ sql """drop catalog if exists ${catalogNameWithCache}"""
+ sql """
+ CREATE CATALOG ${catalogNameWithCache} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${restPort}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minioPort}",
+ "s3.region" = "us-east-1",
+ "iceberg.manifest.cache.enable" = "true"
+ );
+ """
+
+ // Create catalog with manifest cache disabled
+ sql """drop catalog if exists ${catalogNameWithoutCache}"""
+ sql """
+ CREATE CATALOG ${catalogNameWithoutCache} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${restPort}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minioPort}",
+ "s3.region" = "us-east-1",
+ "iceberg.manifest.cache.enable" = "false"
+ );
+ """
+
+ // Test with cache enabled
+ sql """switch ${catalogNameWithCache}"""
+ sql """use multi_catalog"""
+
+ // First explain should populate cache - check manifest cache info
exists
+ explain {
+ sql("verbose select * from ${tableName} where id < 5")
+ contains "manifest cache:"
+ contains "hits=0"
+ contains "misses=7"
+ contains "failures=0"
+ }
+
+ // Test table structure with order_qt (should be same regardless
of cache)
+ order_qt_desc_with_cache """desc ${tableName}"""
+
+ // Test table data with order_qt
+ order_qt_select_with_cache """select * from ${tableName} where id
< 5"""
+
+ // Second explain should hit cache - verify cache metrics are
present
+ explain {
+ sql("verbose select * from ${tableName} where id < 5")
+ contains "manifest cache:"
+ contains "hits=7"
+ contains "misses=0"
+ contains "failures=0"
+ }
+
+ // Test refresh catalog, the cache should be invalidated
+ sql """ refresh catalog ${catalogNameWithCache} """
+ explain {
+ sql("verbose select * from ${tableName} where id < 5")
+ contains "manifest cache:"
+ contains "hits=0"
+ contains "misses=7"
+ contains "failures=0"
+ }
+
+ // Test with cache disabled
+ sql """switch ${catalogNameWithoutCache}"""
+ sql """use multi_catalog"""
+
+ // Test table structure with order_qt (should be same as with
cache)
+ order_qt_desc_without_cache """desc ${tableName}"""
+
+ // Test table data with order_qt
+ order_qt_select_without_cache """select * from ${tableName} where
id < 5"""
+
+ // Explain should not contain manifest cache info when cache is
disabled
+ explain {
+ sql("verbose select * from ${tableName} where id < 5")
+ notContains "manifest cache:"
+ }
+ } finally {
+ sql """set enable_external_table_batch_mode=true"""
+ }
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]