This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpc_preview4-external in repository https://gitbox.apache.org/repos/asf/doris.git
commit bbca9eacc3b6c9fdd089500a03904f7240cd8259 Author: Socrates <[email protected]> AuthorDate: Fri Dec 19 10:53:50 2025 +0800 Manifest cache for tpch1000 (#59178) --- be/src/clucene | 1 + fe/check/checkstyle/suppressions.xml | 3 + .../main/java/org/apache/doris/common/Config.java | 12 + .../doris/datasource/ExternalMetaCacheMgr.java | 7 + .../iceberg/IcebergManifestCacheMgr.java | 35 + .../doris/datasource/iceberg/IcebergUtils.java | 8 + .../iceberg/cache/ContentFileEstimater.java | 194 +++++ .../iceberg/cache/IcebergManifestCache.java | 96 +++ .../iceberg/cache/IcebergManifestCacheLoader.java | 89 ++ .../datasource/iceberg/cache/ManifestCacheKey.java | 58 ++ .../iceberg/cache/ManifestCacheValue.java | 65 ++ .../datasource/iceberg/source/IcebergScanNode.java | 146 +++- .../metastore/AbstractIcebergProperties.java | 62 ++ .../java/org/apache/iceberg/DeleteFileIndex.java | 906 +++++++++++++++++++++ 14 files changed, 1681 insertions(+), 1 deletion(-) diff --git a/be/src/clucene b/be/src/clucene new file mode 160000 index 00000000000..bb22247973e --- /dev/null +++ b/be/src/clucene @@ -0,0 +1 @@ +Subproject commit bb22247973e55dcac9a3eaafedc57cc6c36d2fc3 diff --git a/fe/check/checkstyle/suppressions.xml b/fe/check/checkstyle/suppressions.xml index 8f000bb7616..7340c4c5bd5 100644 --- a/fe/check/checkstyle/suppressions.xml +++ b/fe/check/checkstyle/suppressions.xml @@ -69,6 +69,9 @@ under the License. <!-- ignore hudi disk map copied from hudi/common/util/collection/DiskMap.java --> <suppress files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java" checks="[a-zA-Z0-9]*"/> + <!-- ignore iceberg delete file index copied from iceberg/DeleteFileIndex.java --> + <suppress files="org[\\/]apache[\\/]iceberg[\\/]DeleteFileIndex\.java" checks="[a-zA-Z0-9]*"/> + <!-- ignore gensrc/thrift/ExternalTableSchema.thrift --> <suppress files=".*thrift/schema/external/.*" checks=".*"/> </suppressions> diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 350e34f8a90..d4426c2d515 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2298,6 +2298,18 @@ public class Config extends ConfigBase { }) public static long external_cache_refresh_time_minutes = 10; // 10 mins + @ConfField(description = {"是否启用 Iceberg Manifest DataFile/DeleteFile 缓存。", + "Whether to enable Iceberg manifest DataFile/DeleteFile cache."}) + public static boolean iceberg_manifest_cache_enable = true; + + @ConfField(description = {"Iceberg Manifest 缓存的容量上限,单位 MB。", + "Iceberg manifest cache capacity in MB."}) + public static long iceberg_manifest_cache_capacity_mb = 1024; + + @ConfField(description = {"Iceberg Manifest 缓存的访问过期时间(秒),0 或负数表示不过期。", + "Iceberg manifest cache expire after access in seconds. 0 or negative disables expiration."}) + public static long iceberg_manifest_cache_ttl_sec = 48 * 60 * 60; + /** * Github workflow test type, for setting some session variables * only for certain test type. E.g. only settting batch_size to small diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index e777285a07f..798a2170b1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -29,6 +29,7 @@ import org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor; import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor; import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr; import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor; +import org.apache.doris.datasource.iceberg.IcebergManifestCacheMgr; import org.apache.doris.datasource.iceberg.IcebergMetadataCache; import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache; @@ -97,6 +98,7 @@ public class ExternalMetaCacheMgr { private FileSystemCache fsCache; // all external table row count cache. private ExternalRowCountCache rowCountCache; + private final IcebergManifestCacheMgr icebergManifestCacheMgr; private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; @@ -128,6 +130,7 @@ public class ExternalMetaCacheMgr { rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor); hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor); + icebergManifestCacheMgr = new IcebergManifestCacheMgr(); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); @@ -199,6 +202,10 @@ public class ExternalMetaCacheMgr { return hudiMetadataCacheMgr; } + public IcebergManifestCacheMgr getIcebergManifestCacheMgr() { + return icebergManifestCacheMgr; + } + public IcebergMetadataCache getIcebergMetadataCache() { return icebergMetadataCacheMgr.getIcebergMetadataCache(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergManifestCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergManifestCacheMgr.java new file mode 100644 index 00000000000..ad95e151b98 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergManifestCacheMgr.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache; + +/** + * Wrapper manager for Iceberg manifest cache. + */ +public class IcebergManifestCacheMgr { + private final IcebergManifestCache manifestCache; + + public IcebergManifestCacheMgr() { + this.manifestCache = new IcebergManifestCache(); + } + + public IcebergManifestCache getManifestCache() { + return manifestCache; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 9587ca4f816..28ddf2817df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -56,6 +56,7 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache; import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.mvcc.MvccUtil; @@ -1452,4 +1453,11 @@ public class IcebergUtils { icebergExternalTable.getViewText(); } + public static IcebergManifestCache getManifestCache() { + return Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergManifestCacheMgr() + .getManifestCache(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimater.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimater.java new file mode 100644 index 00000000000..43f60096e31 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimater.java @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.cache; + +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.StructLike; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +/** + * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects. + */ +public final class ContentFileEstimater { + private static final long LIST_BASE_WEIGHT = 48L; + private static final long OBJECT_REFERENCE_WEIGHT = 8L; + private static final long CONTENT_FILE_BASE_WEIGHT = 256L; + private static final long STRING_BASE_WEIGHT = 40L; + private static final long CHAR_BYTES = 2L; + private static final long BYTE_BUFFER_BASE_WEIGHT = 16L; + private static final long MAP_BASE_WEIGHT = 48L; + private static final long MAP_ENTRY_OVERHEAD = 24L; + private static final long LONG_OBJECT_WEIGHT = 24L; + private static final long INT_OBJECT_WEIGHT = 16L; + private static final long PARTITION_BASE_WEIGHT = 48L; + private static final long PARTITION_VALUE_BASE_WEIGHT = 8L; + + private ContentFileEstimater() { + } + + public static long estimate(List<? extends ContentFile<?>> files) { + return listReferenceWeight(files) + estimateContentFilesWeight(files); + } + + private static long listReferenceWeight(List<?> files) { + if (files == null || files.isEmpty()) { + return 0L; + } + return LIST_BASE_WEIGHT + (long) files.size() * OBJECT_REFERENCE_WEIGHT; + } + + private static long estimateContentFilesWeight(List<? extends ContentFile<?>> files) { + long total = 0L; + if (files == null) { + return 0L; + } + for (ContentFile<?> file : files) { + total += estimateContentFileWeight(file); + } + return total; + } + + private static long estimateContentFileWeight(ContentFile<?> file) { + if (file == null) { + return 0L; + } + + long weight = CONTENT_FILE_BASE_WEIGHT; + weight += charSequenceWeight(file.path()); + weight += stringWeight(file.manifestLocation()); + weight += byteBufferWeight(file.keyMetadata()); + weight += partitionWeight(file.partition()); + + weight += numericMapWeight(file.columnSizes()); + weight += numericMapWeight(file.valueCounts()); + weight += numericMapWeight(file.nullValueCounts()); + weight += numericMapWeight(file.nanValueCounts()); + weight += byteBufferMapWeight(file.lowerBounds()); + weight += byteBufferMapWeight(file.upperBounds()); + + weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT); + weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT); + + weight += optionalLongWeight(file.pos()); + weight += optionalLongWeight(file.dataSequenceNumber()); + weight += optionalLongWeight(file.fileSequenceNumber()); + weight += optionalLongWeight(file.firstRowId()); + weight += optionalIntWeight(file.sortOrderId()); + + if (file instanceof DeleteFile) { + DeleteFile deleteFile = (DeleteFile) file; + weight += stringWeight(deleteFile.referencedDataFile()); + weight += optionalLongWeight(deleteFile.contentOffset()); + weight += optionalLongWeight(deleteFile.contentSizeInBytes()); + } + + return weight; + } + + private static long listWeight(List<? extends Number> list, long elementWeight) { + if (list == null || list.isEmpty()) { + return 0L; + } + return LIST_BASE_WEIGHT + (long) list.size() * (OBJECT_REFERENCE_WEIGHT + elementWeight); + } + + private static long numericMapWeight(Map<Integer, Long> map) { + if (map == null || map.isEmpty()) { + return 0L; + } + return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD + LONG_OBJECT_WEIGHT); + } + + private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) { + if (map == null || map.isEmpty()) { + return 0L; + } + long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD; + for (ByteBuffer buffer : map.values()) { + weight += byteBufferWeight(buffer); + } + return weight; + } + + private static long partitionWeight(StructLike partition) { + if (partition == null) { + return 0L; + } + long weight = PARTITION_BASE_WEIGHT + (long) partition.size() * PARTITION_VALUE_BASE_WEIGHT; + for (int i = 0; i < partition.size(); i++) { + Object value = partition.get(i, Object.class); + weight += estimateValueWeight(value); + } + return weight; + } + + private static long estimateValueWeight(Object value) { + if (value == null) { + return 0L; + } + if (value instanceof CharSequence) { + return charSequenceWeight((CharSequence) value); + } else if (value instanceof byte[]) { + return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length; + } else if (value instanceof ByteBuffer) { + return byteBufferWeight((ByteBuffer) value); + } else if (value instanceof Long || value instanceof Double) { + return LONG_OBJECT_WEIGHT; + } else if (value instanceof Integer || value instanceof Float) { + return INT_OBJECT_WEIGHT; + } else if (value instanceof Short || value instanceof Character) { + return 4L; + } else if (value instanceof Boolean) { + return 1L; + } + return OBJECT_REFERENCE_WEIGHT; + } + + private static long charSequenceWeight(CharSequence value) { + if (value == null) { + return 0L; + } + return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES; + } + + private static long stringWeight(String value) { + if (value == null) { + return 0L; + } + return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES; + } + + private static long byteBufferWeight(ByteBuffer buffer) { + if (buffer == null) { + return 0L; + } + return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining(); + } + + private static long optionalLongWeight(Long value) { + return value == null ? 0L : LONG_OBJECT_WEIGHT; + } + + private static long optionalIntWeight(Integer value) { + return value == null ? 0L : INT_OBJECT_WEIGHT; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java new file mode 100644 index 00000000000..be919c5d313 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.cache; + +import org.apache.doris.common.Config; +import org.apache.doris.datasource.CacheException; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; + +/** + * A lightweight manifest cache that stores parsed DataFile/DeleteFile lists per manifest. + */ +public class IcebergManifestCache { + private static final Logger LOG = LogManager.getLogger(IcebergManifestCache.class); + + private final LoadingCache<ManifestCacheKey, ManifestCacheValue> cache; + + public IcebergManifestCache() { + long capacityInBytes = Config.iceberg_manifest_cache_capacity_mb * 1024L * 1024L; + Weigher<ManifestCacheKey, ManifestCacheValue> weigher = (key, value) -> { + long weight = Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L); + if (weight > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int) weight; + }; + Caffeine<ManifestCacheKey, ManifestCacheValue> builder = Caffeine.newBuilder() + .maximumWeight(capacityInBytes) + .weigher(weigher); + if (Config.iceberg_manifest_cache_ttl_sec > 0) { + builder = builder.expireAfterAccess(Duration.ofSeconds(Config.iceberg_manifest_cache_ttl_sec)); + } + cache = builder.build(new CacheLoader<ManifestCacheKey, ManifestCacheValue>() { + @Override + public ManifestCacheValue load(ManifestCacheKey key) { + throw new CacheException("Manifest cache loader should be provided explicitly for key %s", null, key); + } + }); + } + + public ManifestCacheValue get(ManifestCacheKey key, Callable<ManifestCacheValue> loader) { + try { + return cache.get(key, ignored -> { + try { + return loader.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + throw new CacheException("Failed to load manifest cache for key %s", e, key); + } + } + + public Optional<ManifestCacheValue> peek(ManifestCacheKey key) { + return Optional.ofNullable(cache.getIfPresent(key)); + } + + public void invalidateByPath(String path) { + cache.asMap().keySet().stream() + .filter(key -> key.getPath().equals(path)) + .forEach(cache::invalidate); + } + + public void invalidateAll() { + cache.invalidateAll(); + } + + public ManifestCacheKey buildKey(String path) { + return new ManifestCacheKey(path); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java new file mode 100644 index 00000000000..dc4d16da61b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.cache; + +import org.apache.doris.datasource.CacheException; + +import com.google.common.collect.Lists; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.List; + +/** + * Helper to load manifest content and populate the manifest cache. + */ +public class IcebergManifestCacheLoader { + private static final Logger LOG = LogManager.getLogger(IcebergManifestCacheLoader.class); + + private IcebergManifestCacheLoader() { + } + + public static ManifestCacheValue loadDataFilesWithCache(IcebergManifestCache cache, ManifestFile manifest, + Table table) { + ManifestCacheKey key = buildKey(cache, manifest); + return cache.get(key, () -> loadDataFiles(manifest, table)); + } + + public static ManifestCacheValue loadDeleteFilesWithCache(IcebergManifestCache cache, ManifestFile manifest, + Table table) { + ManifestCacheKey key = buildKey(cache, manifest); + return cache.get(key, () -> loadDeleteFiles(manifest, table)); + } + + private static ManifestCacheValue loadDataFiles(ManifestFile manifest, Table table) { + List<DataFile> dataFiles = Lists.newArrayList(); + try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) { + // ManifestReader implements CloseableIterable<DataFile>, iterate directly + for (DataFile dataFile : reader) { + dataFiles.add(dataFile.copy()); + } + } catch (IOException e) { + LOG.warn("Failed to read data manifest {}", manifest.path(), e); + throw new CacheException("Failed to read data manifest %s", e, manifest.path()); + } + return ManifestCacheValue.forDataFiles(dataFiles); + } + + private static ManifestCacheValue loadDeleteFiles(ManifestFile manifest, Table table) { + List<DeleteFile> deleteFiles = Lists.newArrayList(); + try (ManifestReader<DeleteFile> reader = ManifestFiles.readDeleteManifest(manifest, table.io(), + table.specs())) { + // ManifestReader implements CloseableIterable<DeleteFile>, iterate directly + for (DeleteFile deleteFile : reader) { + deleteFiles.add(deleteFile.copy()); + } + } catch (IOException e) { + LOG.warn("Failed to read delete manifest {}", manifest.path(), e); + throw new CacheException("Failed to read delete manifest %s", e, manifest.path()); + } + return ManifestCacheValue.forDeleteFiles(deleteFiles); + } + + private static ManifestCacheKey buildKey(IcebergManifestCache cache, ManifestFile manifest) { + // Iceberg manifest files are immutable, so path uniquely identifies a manifest + return cache.buildKey(manifest.path()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java new file mode 100644 index 00000000000..41b52187aec --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.cache; + +import java.util.Objects; + +/** + * Cache key for a single Iceberg manifest file. + * Since Iceberg manifest files are immutable, path uniquely identifies a manifest. + */ +public class ManifestCacheKey { + private final String path; + + public ManifestCacheKey(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ManifestCacheKey)) { + return false; + } + ManifestCacheKey that = (ManifestCacheKey) o; + return Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + return Objects.hash(path); + } + + @Override + public String toString() { + return "ManifestCacheKey{path='" + path + "'}"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java new file mode 100644 index 00000000000..0c7c9154639 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.cache; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; + +import java.util.Collections; +import java.util.List; + +/** + * Cached manifest payload containing parsed files and an estimated weight. + */ +public class ManifestCacheValue { + private final List<DataFile> dataFiles; + private final List<DeleteFile> deleteFiles; + private final long weightBytes; + + private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile> deleteFiles, long weightBytes) { + this.dataFiles = dataFiles == null ? Collections.emptyList() : dataFiles; + this.deleteFiles = deleteFiles == null ? Collections.emptyList() : deleteFiles; + this.weightBytes = weightBytes; + } + + public static ManifestCacheValue forDataFiles(List<DataFile> dataFiles) { + return new ManifestCacheValue(dataFiles, Collections.emptyList(), + estimateWeight(dataFiles, Collections.emptyList())); + } + + public static ManifestCacheValue forDeleteFiles(List<DeleteFile> deleteFiles) { + return new ManifestCacheValue(Collections.emptyList(), deleteFiles, + estimateWeight(Collections.emptyList(), deleteFiles)); + } + + public List<DataFile> getDataFiles() { + return dataFiles; + } + + public List<DeleteFile> getDeleteFiles() { + return deleteFiles; + } + + public long getWeightBytes() { + return weightBytes; + } + + private static long estimateWeight(List<DataFile> dataFiles, List<DeleteFile> deleteFiles) { + return ContentFileEstimater.estimate(dataFiles) + ContentFileEstimater.estimate(deleteFiles); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index f5208397a0f..0ffe86edb31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.ExecutionAuthenticator; @@ -38,6 +39,9 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache; +import org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader; +import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.planner.PlanNodeId; @@ -57,18 +61,27 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.iceberg.BaseFileScanTask; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DeleteFileIndex; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.InclusiveMetricsEvaluator; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.types.Conversions; @@ -78,9 +91,12 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; @@ -358,8 +374,136 @@ public class IcebergScanNode extends FileQueryScanNode { } private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { + if (!Config.iceberg_manifest_cache_enable) { + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + try { + return planFileScanTaskWithManifestCache(scan); + } catch (Exception e) { + LOG.warn("Plan with manifest cache failed, fallback to original scan: {}", e.getMessage()); + long targetSplitSize = getRealFileSplitSize(0); + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + } + } + + private CloseableIterable<FileScanTask> planFileScanTaskWithManifestCache(TableScan scan) throws IOException { + // Get the snapshot from the scan; return empty if no snapshot exists + Snapshot snapshot = scan.snapshot(); + if (snapshot == null) { + return CloseableIterable.withNoopClose(Collections.emptyList()); + } + + // Initialize manifest cache for efficient manifest file access + IcebergManifestCache cache = IcebergUtils.getManifestCache(); + + // Convert query conjuncts to Iceberg filter expression + // This combines all predicates with AND logic for partition/file pruning + Expression filterExpr = conjuncts.stream() + .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct, icebergTable.schema())) + .filter(Objects::nonNull) + .reduce(Expressions.alwaysTrue(), Expressions::and); + + // Get all partition specs by their IDs for later use + Map<Integer, PartitionSpec> specsById = icebergTable.specs(); + boolean caseSensitive = true; + + // Create residual evaluators for each partition spec + // Residual evaluators compute the remaining filter expression after partition pruning + Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>(); + specsById.forEach((id, spec) -> residualEvaluators.put(id, + ResidualEvaluator.of(spec, filterExpr == null ? Expressions.alwaysTrue() : filterExpr, + caseSensitive))); + + // Create metrics evaluator for file-level pruning based on column statistics + InclusiveMetricsEvaluator metricsEvaluator = filterExpr == null ? null + : new InclusiveMetricsEvaluator(icebergTable.schema(), filterExpr, caseSensitive); + + // ========== Phase 1: Load delete files from delete manifests ========== + List<DeleteFile> deleteFiles = new ArrayList<>(); + List<ManifestFile> deleteManifests = snapshot.deleteManifests(icebergTable.io()); + for (ManifestFile manifest : deleteManifests) { + // Skip non-delete manifests + if (manifest.content() != ManifestContent.DELETES) { + continue; + } + // Get the partition spec for this manifest + PartitionSpec spec = specsById.get(manifest.partitionSpecId()); + if (spec == null) { + continue; + } + // Create manifest evaluator for partition-level pruning + ManifestEvaluator evaluator = filterExpr == null ? null + : ManifestEvaluator.forPartitionFilter(filterExpr, spec, caseSensitive); + // Skip manifest if it doesn't match the filter expression (partition pruning) + if (evaluator != null && !evaluator.eval(manifest)) { + continue; + } + // Load delete files from cache (or from storage if not cached) + ManifestCacheValue value = IcebergManifestCacheLoader.loadDeleteFilesWithCache(cache, manifest, + icebergTable); + deleteFiles.addAll(value.getDeleteFiles()); + } + + // Build delete file index for efficient lookup of deletes applicable to each data file + DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(deleteFiles) + .specsById(specsById) + .caseSensitive(caseSensitive) + .build(); + + // ========== Phase 2: Load data files and create scan tasks ========== + List<FileScanTask> tasks = new ArrayList<>(); + try (CloseableIterable<ManifestFile> dataManifests = + IcebergUtils.getMatchingManifest(snapshot.dataManifests(icebergTable.io()), + specsById, filterExpr)) { + for (ManifestFile manifest : dataManifests) { + // Skip non-data manifests + if (manifest.content() != ManifestContent.DATA) { + continue; + } + // Get the partition spec for this manifest + PartitionSpec spec = specsById.get(manifest.partitionSpecId()); + if (spec == null) { + continue; + } + // Get the residual evaluator for this partition spec + ResidualEvaluator residualEvaluator = residualEvaluators.get(manifest.partitionSpecId()); + + // Load data files from cache (or from storage if not cached) + ManifestCacheValue value = IcebergManifestCacheLoader.loadDataFilesWithCache(cache, manifest, + icebergTable); + + // Process each data file in the manifest + for (org.apache.iceberg.DataFile dataFile : value.getDataFiles()) { + // Skip file if column statistics indicate no matching rows (metrics-based pruning) + if (metricsEvaluator != null && !metricsEvaluator.eval(dataFile)) { + continue; + } + // Skip file if partition values don't match the residual filter + if (residualEvaluator != null) { + if (residualEvaluator.residualFor(dataFile.partition()).equals(Expressions.alwaysFalse())) { + continue; + } + } + // Find all delete files that apply to this data file based on sequence number + List<DeleteFile> deletes = Arrays.asList( + deleteIndex.forDataFile(dataFile.dataSequenceNumber(), dataFile)); + + // Create a FileScanTask containing the data file, associated deletes, and metadata + tasks.add(new BaseFileScanTask( + dataFile, + deletes.toArray(new DeleteFile[0]), + SchemaParser.toJson(icebergTable.schema()), + PartitionSpecParser.toJson(spec), + residualEvaluator == null ? ResidualEvaluator.unpartitioned(Expressions.alwaysTrue()) + : residualEvaluator)); + } + } + } + + // Split tasks into smaller chunks based on target split size for parallel processing long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks), targetSplitSize); } private Split createIcebergSplit(FileScanTask fileScanTask) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java index 2cc829c8743..88def12d2a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java @@ -43,6 +43,43 @@ public abstract class AbstractIcebergProperties extends MetastoreProperties { ) protected String warehouse; + @Getter + @ConnectorProperty( + names = {CatalogProperties.IO_MANIFEST_CACHE_ENABLED}, + required = false, + description = "Controls whether to use caching during manifest reads or not. Default: false." + ) + protected String ioManifestCacheEnabled; + + @Getter + @ConnectorProperty( + names = {CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS}, + required = false, + description = "Controls the maximum duration for which an entry stays in the manifest cache. " + + "Must be a non-negative value. Zero means entries expire only due to memory pressure. " + + "Default: 60000 (60s)." + ) + protected String ioManifestCacheExpirationIntervalMs; + + @Getter + @ConnectorProperty( + names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES}, + required = false, + description = "Controls the maximum total amount of bytes to cache in manifest cache. " + + "Must be a positive value. Default: 104857600 (100MB)." + ) + protected String ioManifestCacheMaxTotalBytes; + + @Getter + @ConnectorProperty( + names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH}, + required = false, + description = "Controls the maximum length of file to be considered for caching. " + + "An InputFile will not be cached if the length is longer than this limit. " + + "Must be a positive value. Default: 8388608 (8MB)." + ) + protected String ioManifestCacheMaxContentLength; + @Getter protected ExecutionAuthenticator executionAuthenticator = new ExecutionAuthenticator(){}; @@ -80,6 +117,9 @@ public abstract class AbstractIcebergProperties extends MetastoreProperties { catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); } + // Add manifest cache properties if configured + addManifestCacheProperties(catalogProps); + Catalog catalog = initCatalog(catalogName, catalogProps, storagePropertiesList); if (catalog == null) { @@ -88,6 +128,28 @@ public abstract class AbstractIcebergProperties extends MetastoreProperties { return catalog; } + /** + * Add manifest cache related properties to catalog properties. + * These properties control caching behavior during manifest reads. + * + * @param catalogProps the catalog properties map to add manifest cache properties to + */ + protected void addManifestCacheProperties(Map<String, String> catalogProps) { + if (StringUtils.isNotBlank(ioManifestCacheEnabled)) { + catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, ioManifestCacheEnabled); + } + if (StringUtils.isNotBlank(ioManifestCacheExpirationIntervalMs)) { + catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, + ioManifestCacheExpirationIntervalMs); + } + if (StringUtils.isNotBlank(ioManifestCacheMaxTotalBytes)) { + catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, ioManifestCacheMaxTotalBytes); + } + if (StringUtils.isNotBlank(ioManifestCacheMaxContentLength)) { + catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, ioManifestCacheMaxContentLength); + } + } + /** * Subclasses must implement this to create the concrete Catalog instance. */ diff --git a/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java new file mode 100644 index 00000000000..5c9cdd93c45 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -0,0 +1,906 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.metrics.ScanMetricsUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.PartitionMap; +import org.apache.iceberg.util.PartitionSet; +import org.apache.iceberg.util.Tasks; + +/** + * An index of {@link DeleteFile delete files} by sequence number. + * + * <p>Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, + * DataFile)} or {@link #forEntry(ManifestEntry)} to get the delete files to apply to a given data + * file. + * + * Copyed from https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java + * Change DeleteFileIndex and some methods to public. + */ +public class DeleteFileIndex { + private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0]; + + private final EqualityDeletes globalDeletes; + private final PartitionMap<EqualityDeletes> eqDeletesByPartition; + private final PartitionMap<PositionDeletes> posDeletesByPartition; + private final Map<String, PositionDeletes> posDeletesByPath; + private final Map<String, DeleteFile> dvByPath; + private final boolean hasEqDeletes; + private final boolean hasPosDeletes; + private final boolean isEmpty; + + private DeleteFileIndex( + EqualityDeletes globalDeletes, + PartitionMap<EqualityDeletes> eqDeletesByPartition, + PartitionMap<PositionDeletes> posDeletesByPartition, + Map<String, PositionDeletes> posDeletesByPath, + Map<String, DeleteFile> dvByPath) { + this.globalDeletes = globalDeletes; + this.eqDeletesByPartition = eqDeletesByPartition; + this.posDeletesByPartition = posDeletesByPartition; + this.posDeletesByPath = posDeletesByPath; + this.dvByPath = dvByPath; + this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null; + this.hasPosDeletes = + posDeletesByPartition != null || posDeletesByPath != null || dvByPath != null; + this.isEmpty = !hasEqDeletes && !hasPosDeletes; + } + + public boolean isEmpty() { + return isEmpty; + } + + public boolean hasEqualityDeletes() { + return hasEqDeletes; + } + + public boolean hasPositionDeletes() { + return hasPosDeletes; + } + + public Iterable<DeleteFile> referencedDeleteFiles() { + Iterable<DeleteFile> deleteFiles = Collections.emptyList(); + + if (globalDeletes != null) { + deleteFiles = Iterables.concat(deleteFiles, globalDeletes.referencedDeleteFiles()); + } + + if (eqDeletesByPartition != null) { + for (EqualityDeletes deletes : eqDeletesByPartition.values()) { + deleteFiles = Iterables.concat(deleteFiles, deletes.referencedDeleteFiles()); + } + } + + if (posDeletesByPartition != null) { + for (PositionDeletes deletes : posDeletesByPartition.values()) { + deleteFiles = Iterables.concat(deleteFiles, deletes.referencedDeleteFiles()); + } + } + + if (posDeletesByPath != null) { + for (PositionDeletes deletes : posDeletesByPath.values()) { + deleteFiles = Iterables.concat(deleteFiles, deletes.referencedDeleteFiles()); + } + } + + if (dvByPath != null) { + deleteFiles = Iterables.concat(deleteFiles, dvByPath.values()); + } + + return deleteFiles; + } + + DeleteFile[] forEntry(ManifestEntry<DataFile> entry) { + return forDataFile(entry.dataSequenceNumber(), entry.file()); + } + + public DeleteFile[] forDataFile(DataFile file) { + return forDataFile(file.dataSequenceNumber(), file); + } + + public DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { + if (isEmpty) { + return EMPTY_DELETES; + } + + DeleteFile[] global = findGlobalDeletes(sequenceNumber, file); + DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file); + DeleteFile dv = findDV(sequenceNumber, file); + if (dv != null && global == null && eqPartition == null) { + return new DeleteFile[] {dv}; + } else if (dv != null) { + return concat(global, eqPartition, new DeleteFile[] {dv}); + } else { + DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, file); + DeleteFile[] posPath = findPathDeletes(sequenceNumber, file); + return concat(global, eqPartition, posPartition, posPath); + } + } + + private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) { + return globalDeletes == null ? EMPTY_DELETES : globalDeletes.filter(seq, dataFile); + } + + private DeleteFile[] findPosPartitionDeletes(long seq, DataFile dataFile) { + if (posDeletesByPartition == null) { + return EMPTY_DELETES; + } + + PositionDeletes deletes = posDeletesByPartition.get(dataFile.specId(), dataFile.partition()); + return deletes == null ? EMPTY_DELETES : deletes.filter(seq); + } + + private DeleteFile[] findEqPartitionDeletes(long seq, DataFile dataFile) { + if (eqDeletesByPartition == null) { + return EMPTY_DELETES; + } + + EqualityDeletes deletes = eqDeletesByPartition.get(dataFile.specId(), dataFile.partition()); + return deletes == null ? EMPTY_DELETES : deletes.filter(seq, dataFile); + } + + @SuppressWarnings("CollectionUndefinedEquality") + private DeleteFile[] findPathDeletes(long seq, DataFile dataFile) { + if (posDeletesByPath == null) { + return EMPTY_DELETES; + } + + PositionDeletes deletes = posDeletesByPath.get(dataFile.location()); + return deletes == null ? EMPTY_DELETES : deletes.filter(seq); + } + + private DeleteFile findDV(long seq, DataFile dataFile) { + if (dvByPath == null) { + return null; + } + + DeleteFile dv = dvByPath.get(dataFile.location()); + if (dv != null) { + ValidationException.check( + dv.dataSequenceNumber() >= seq, + "DV data sequence number (%s) must be greater than or equal to data file sequence number (%s)", + dv.dataSequenceNumber(), + seq); + } + return dv; + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private static boolean canContainEqDeletesForFile( + DataFile dataFile, EqualityDeleteFile deleteFile) { + Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds(); + Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds(); + + // whether to check data ranges or to assume that the ranges match + // if upper/lower bounds are missing, null counts may still be used to determine delete files + // can be skipped + boolean checkRanges = + dataLowers != null && dataUppers != null && deleteFile.hasLowerAndUpperBounds(); + + Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts(); + Map<Integer, Long> dataValueCounts = dataFile.valueCounts(); + Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts(); + Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts(); + + for (Types.NestedField field : deleteFile.equalityFields()) { + if (!field.type().isPrimitiveType()) { + // stats are not kept for nested types. assume that the delete file may match + continue; + } + + if (containsNull(dataNullCounts, field) && containsNull(deleteNullCounts, field)) { + // the data has null values and null has been deleted, so the deletes must be applied + continue; + } + + if (allNull(dataNullCounts, dataValueCounts, field) && allNonNull(deleteNullCounts, field)) { + // the data file contains only null values for this field, but there are no deletes for null + // values + return false; + } + + if (allNull(deleteNullCounts, deleteValueCounts, field) + && allNonNull(dataNullCounts, field)) { + // the delete file removes only null rows with null for this field, but there are no data + // rows with null + return false; + } + + if (!checkRanges) { + // some upper and lower bounds are missing, assume they match + continue; + } + + int id = field.fieldId(); + ByteBuffer dataLower = dataLowers.get(id); + ByteBuffer dataUpper = dataUppers.get(id); + Object deleteLower = deleteFile.lowerBound(id); + Object deleteUpper = deleteFile.upperBound(id); + if (dataLower == null || dataUpper == null || deleteLower == null || deleteUpper == null) { + // at least one bound is not known, assume the delete file may match + continue; + } + + if (!rangesOverlap(field, dataLower, dataUpper, deleteLower, deleteUpper)) { + // no values overlap between the data file and the deletes + return false; + } + } + + return true; + } + + private static <T> boolean rangesOverlap( + Types.NestedField field, + ByteBuffer dataLowerBuf, + ByteBuffer dataUpperBuf, + T deleteLower, + T deleteUpper) { + Type.PrimitiveType type = field.type().asPrimitiveType(); + Comparator<T> comparator = Comparators.forType(type); + + T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf); + if (comparator.compare(dataLower, deleteUpper) > 0) { + return false; + } + + T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf); + if (comparator.compare(deleteLower, dataUpper) > 0) { + return false; + } + + return true; + } + + private static boolean allNonNull(Map<Integer, Long> nullValueCounts, Types.NestedField field) { + if (field.isRequired()) { + return true; + } + + if (nullValueCounts == null) { + return false; + } + + Long nullValueCount = nullValueCounts.get(field.fieldId()); + if (nullValueCount == null) { + return false; + } + + return nullValueCount <= 0; + } + + private static boolean allNull( + Map<Integer, Long> nullValueCounts, Map<Integer, Long> valueCounts, Types.NestedField field) { + if (field.isRequired()) { + return false; + } + + if (nullValueCounts == null || valueCounts == null) { + return false; + } + + Long nullValueCount = nullValueCounts.get(field.fieldId()); + Long valueCount = valueCounts.get(field.fieldId()); + if (nullValueCount == null || valueCount == null) { + return false; + } + + return nullValueCount.equals(valueCount); + } + + private static boolean containsNull(Map<Integer, Long> nullValueCounts, Types.NestedField field) { + if (field.isRequired()) { + return false; + } + + if (nullValueCounts == null) { + return true; + } + + Long nullValueCount = nullValueCounts.get(field.fieldId()); + if (nullValueCount == null) { + return true; + } + + return nullValueCount > 0; + } + + static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) { + return new Builder(io, Sets.newHashSet(deleteManifests)); + } + + // changed to public method. + public static Builder builderFor(Iterable<DeleteFile> deleteFiles) { + return new Builder(deleteFiles); + } + + // changed to public class. + public static class Builder { + private final FileIO io; + private final Set<ManifestFile> deleteManifests; + private final Iterable<DeleteFile> deleteFiles; + private long minSequenceNumber = 0L; + private Map<Integer, PartitionSpec> specsById = null; + private Expression dataFilter = Expressions.alwaysTrue(); + private Expression partitionFilter = Expressions.alwaysTrue(); + private PartitionSet partitionSet = null; + private boolean caseSensitive = true; + private ExecutorService executorService = null; + private ScanMetrics scanMetrics = ScanMetrics.noop(); + private boolean ignoreResiduals = false; + + Builder(FileIO io, Set<ManifestFile> deleteManifests) { + this.io = io; + this.deleteManifests = Sets.newHashSet(deleteManifests); + this.deleteFiles = null; + } + + Builder(Iterable<DeleteFile> deleteFiles) { + this.io = null; + this.deleteManifests = null; + this.deleteFiles = deleteFiles; + } + + Builder afterSequenceNumber(long seq) { + this.minSequenceNumber = seq; + return this; + } + + public Builder specsById(Map<Integer, PartitionSpec> newSpecsById) { + this.specsById = newSpecsById; + return this; + } + + Builder filterData(Expression newDataFilter) { + Preconditions.checkArgument( + deleteFiles == null, "Index constructed from files does not support data filters"); + this.dataFilter = Expressions.and(dataFilter, newDataFilter); + return this; + } + + Builder filterPartitions(Expression newPartitionFilter) { + Preconditions.checkArgument( + deleteFiles == null, "Index constructed from files does not support partition filters"); + this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter); + return this; + } + + Builder filterPartitions(PartitionSet newPartitionSet) { + Preconditions.checkArgument( + deleteFiles == null, "Index constructed from files does not support partition filters"); + this.partitionSet = newPartitionSet; + return this; + } + + public Builder caseSensitive(boolean newCaseSensitive) { + this.caseSensitive = newCaseSensitive; + return this; + } + + Builder planWith(ExecutorService newExecutorService) { + this.executorService = newExecutorService; + return this; + } + + Builder scanMetrics(ScanMetrics newScanMetrics) { + this.scanMetrics = newScanMetrics; + return this; + } + + Builder ignoreResiduals() { + this.ignoreResiduals = true; + return this; + } + + private Iterable<DeleteFile> filterDeleteFiles() { + return Iterables.filter(deleteFiles, file -> file.dataSequenceNumber() > minSequenceNumber); + } + + private Collection<DeleteFile> loadDeleteFiles() { + // read all of the matching delete manifests in parallel and accumulate the matching files in + // a queue + Queue<DeleteFile> files = new ConcurrentLinkedQueue<>(); + Tasks.foreach(deleteManifestReaders()) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(executorService) + .run( + deleteFile -> { + try (CloseableIterable<ManifestEntry<DeleteFile>> reader = deleteFile) { + for (ManifestEntry<DeleteFile> entry : reader) { + if (entry.dataSequenceNumber() > minSequenceNumber) { + // copy with stats for better filtering against data file stats + files.add(entry.file().copy()); + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close"); + } + }); + return files; + } + + public DeleteFileIndex build() { + Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : loadDeleteFiles(); + + EqualityDeletes globalDeletes = new EqualityDeletes(); + PartitionMap<EqualityDeletes> eqDeletesByPartition = PartitionMap.create(specsById); + PartitionMap<PositionDeletes> posDeletesByPartition = PartitionMap.create(specsById); + Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap(); + Map<String, DeleteFile> dvByPath = Maps.newHashMap(); + + for (DeleteFile file : files) { + switch (file.content()) { + case POSITION_DELETES: + if (ContentFileUtil.isDV(file)) { + add(dvByPath, file); + } else { + add(posDeletesByPath, posDeletesByPartition, file); + } + break; + case EQUALITY_DELETES: + add(globalDeletes, eqDeletesByPartition, file); + break; + default: + throw new UnsupportedOperationException("Unsupported content: " + file.content()); + } + ScanMetricsUtil.indexedDeleteFile(scanMetrics, file); + } + + return new DeleteFileIndex( + globalDeletes.isEmpty() ? null : globalDeletes, + eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition, + posDeletesByPartition.isEmpty() ? null : posDeletesByPartition, + posDeletesByPath.isEmpty() ? null : posDeletesByPath, + dvByPath.isEmpty() ? null : dvByPath); + } + + private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) { + String path = dv.referencedDataFile(); + DeleteFile existingDV = dvByPath.putIfAbsent(path, dv); + if (existingDV != null) { + throw new ValidationException( + "Can't index multiple DVs for %s: %s and %s", + path, ContentFileUtil.dvDesc(dv), ContentFileUtil.dvDesc(existingDV)); + } + } + + private void add( + Map<String, PositionDeletes> deletesByPath, + PartitionMap<PositionDeletes> deletesByPartition, + DeleteFile file) { + String path = ContentFileUtil.referencedDataFileLocation(file); + + PositionDeletes deletes; + if (path != null) { + deletes = deletesByPath.computeIfAbsent(path, ignored -> new PositionDeletes()); + } else { + int specId = file.specId(); + StructLike partition = file.partition(); + deletes = deletesByPartition.computeIfAbsent(specId, partition, PositionDeletes::new); + } + + deletes.add(file); + } + + private void add( + EqualityDeletes globalDeletes, + PartitionMap<EqualityDeletes> deletesByPartition, + DeleteFile file) { + PartitionSpec spec = specsById.get(file.specId()); + + EqualityDeletes deletes; + if (spec.isUnpartitioned()) { + deletes = globalDeletes; + } else { + int specId = spec.specId(); + StructLike partition = file.partition(); + deletes = deletesByPartition.computeIfAbsent(specId, partition, EqualityDeletes::new); + } + + deletes.add(spec, file); + } + + private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() { + Expression entryFilter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter; + + LoadingCache<Integer, Expression> partExprCache = + specsById == null + ? null + : Caffeine.newBuilder() + .build( + specId -> { + PartitionSpec spec = specsById.get(specId); + return Projections.inclusive(spec, caseSensitive).project(dataFilter); + }); + + LoadingCache<Integer, ManifestEvaluator> evalCache = + specsById == null + ? null + : Caffeine.newBuilder() + .build( + specId -> { + PartitionSpec spec = specsById.get(specId); + return ManifestEvaluator.forPartitionFilter( + Expressions.and(partitionFilter, partExprCache.get(specId)), + spec, + caseSensitive); + }); + + CloseableIterable<ManifestFile> closeableDeleteManifests = + CloseableIterable.withNoopClose(deleteManifests); + CloseableIterable<ManifestFile> matchingManifests = + evalCache == null + ? closeableDeleteManifests + : CloseableIterable.filter( + scanMetrics.skippedDeleteManifests(), + closeableDeleteManifests, + manifest -> + manifest.content() == ManifestContent.DELETES + && (manifest.hasAddedFiles() || manifest.hasExistingFiles()) + && evalCache.get(manifest.partitionSpecId()).eval(manifest)); + + matchingManifests = + CloseableIterable.count(scanMetrics.scannedDeleteManifests(), matchingManifests); + return Iterables.transform( + matchingManifests, + manifest -> + ManifestFiles.readDeleteManifest(manifest, io, specsById) + .filterRows(entryFilter) + .filterPartitions( + Expressions.and( + partitionFilter, partExprCache.get(manifest.partitionSpecId()))) + .filterPartitions(partitionSet) + .caseSensitive(caseSensitive) + .scanMetrics(scanMetrics) + .liveEntries()); + } + } + + /** + * Finds an index in the sorted array of sequence numbers where the given sequence number should + * be inserted or is found. + * + * <p>If the sequence number is present in the array, this method returns the index of the first + * occurrence of the sequence number. If the sequence number is not present, the method returns + * the index where the sequence number would be inserted while maintaining the sorted order of the + * array. This returned index ranges from 0 (inclusive) to the length of the array (inclusive). + * + * <p>This method is used to determine the subset of delete files that apply to a given data file. + * + * @param seqs an array of sequence numbers sorted in ascending order + * @param seq the sequence number to search for + * @return the index of the first occurrence or the insertion point + */ + private static int findStartIndex(long[] seqs, long seq) { + int pos = Arrays.binarySearch(seqs, seq); + int start; + if (pos < 0) { + // the sequence number was not found, where it would be inserted is -(pos + 1) + start = -(pos + 1); + } else { + // the sequence number was found, but may not be the first + // find the first delete file with the given sequence number by decrementing the position + start = pos; + while (start > 0 && seqs[start - 1] >= seq) { + start -= 1; + } + } + + return start; + } + + private static DeleteFile[] concat(DeleteFile[]... deletes) { + return ArrayUtil.concat(DeleteFile.class, deletes); + } + + // a group of position delete files sorted by the sequence number they apply to + static class PositionDeletes { + private static final Comparator<DeleteFile> SEQ_COMPARATOR = + Comparator.comparingLong(DeleteFile::dataSequenceNumber); + + // indexed state + private long[] seqs = null; + private DeleteFile[] files = null; + + // a buffer that is used to hold files before indexing + private volatile List<DeleteFile> buffer = Lists.newArrayList(); + + public void add(DeleteFile file) { + Preconditions.checkState(buffer != null, "Can't add files upon indexing"); + buffer.add(file); + } + + public DeleteFile[] filter(long seq) { + indexIfNeeded(); + + int start = findStartIndex(seqs, seq); + + if (start >= files.length) { + return EMPTY_DELETES; + } + + if (start == 0) { + return files; + } + + int matchingFilesCount = files.length - start; + DeleteFile[] matchingFiles = new DeleteFile[matchingFilesCount]; + System.arraycopy(files, start, matchingFiles, 0, matchingFilesCount); + return matchingFiles; + } + + public Iterable<DeleteFile> referencedDeleteFiles() { + indexIfNeeded(); + return Arrays.asList(files); + } + + public boolean isEmpty() { + indexIfNeeded(); + return files.length == 0; + } + + private void indexIfNeeded() { + if (buffer != null) { + synchronized (this) { + if (buffer != null) { + this.files = indexFiles(buffer); + this.seqs = indexSeqs(files); + this.buffer = null; + } + } + } + } + + private static DeleteFile[] indexFiles(List<DeleteFile> list) { + DeleteFile[] array = list.toArray(EMPTY_DELETES); + Arrays.sort(array, SEQ_COMPARATOR); + return array; + } + + private static long[] indexSeqs(DeleteFile[] files) { + long[] seqs = new long[files.length]; + + for (int index = 0; index < files.length; index++) { + seqs[index] = files[index].dataSequenceNumber(); + } + + return seqs; + } + } + + // a group of equality delete files sorted by the sequence number they apply to + static class EqualityDeletes { + private static final Comparator<EqualityDeleteFile> SEQ_COMPARATOR = + Comparator.comparingLong(EqualityDeleteFile::applySequenceNumber); + private static final EqualityDeleteFile[] EMPTY_EQUALITY_DELETES = new EqualityDeleteFile[0]; + + // indexed state + private long[] seqs = null; + private EqualityDeleteFile[] files = null; + + // a buffer that is used to hold files before indexing + private volatile List<EqualityDeleteFile> buffer = Lists.newArrayList(); + + public void add(PartitionSpec spec, DeleteFile file) { + Preconditions.checkState(buffer != null, "Can't add files upon indexing"); + buffer.add(new EqualityDeleteFile(spec, file)); + } + + public DeleteFile[] filter(long seq, DataFile dataFile) { + indexIfNeeded(); + + int start = findStartIndex(seqs, seq); + + if (start >= files.length) { + return EMPTY_DELETES; + } + + List<DeleteFile> matchingFiles = Lists.newArrayList(); + + for (int index = start; index < files.length; index++) { + EqualityDeleteFile file = files[index]; + if (canContainEqDeletesForFile(dataFile, file)) { + matchingFiles.add(file.wrapped()); + } + } + + return matchingFiles.toArray(EMPTY_DELETES); + } + + public Iterable<DeleteFile> referencedDeleteFiles() { + indexIfNeeded(); + return Iterables.transform(Arrays.asList(files), EqualityDeleteFile::wrapped); + } + + public boolean isEmpty() { + indexIfNeeded(); + return files.length == 0; + } + + private void indexIfNeeded() { + if (buffer != null) { + synchronized (this) { + if (buffer != null) { + this.files = indexFiles(buffer); + this.seqs = indexSeqs(files); + this.buffer = null; + } + } + } + } + + private static EqualityDeleteFile[] indexFiles(List<EqualityDeleteFile> list) { + EqualityDeleteFile[] array = list.toArray(EMPTY_EQUALITY_DELETES); + Arrays.sort(array, SEQ_COMPARATOR); + return array; + } + + private static long[] indexSeqs(EqualityDeleteFile[] files) { + long[] seqs = new long[files.length]; + + for (int index = 0; index < files.length; index++) { + seqs[index] = files[index].applySequenceNumber(); + } + + return seqs; + } + } + + // an equality delete file wrapper that caches the converted boundaries for faster boundary checks + // this class is not meant to be exposed beyond the delete file index + private static class EqualityDeleteFile { + private final PartitionSpec spec; + private final DeleteFile wrapped; + private final long applySequenceNumber; + private volatile List<Types.NestedField> equalityFields = null; + private volatile Map<Integer, Object> convertedLowerBounds = null; + private volatile Map<Integer, Object> convertedUpperBounds = null; + + EqualityDeleteFile(PartitionSpec spec, DeleteFile file) { + this.spec = spec; + this.wrapped = file; + this.applySequenceNumber = wrapped.dataSequenceNumber() - 1; + } + + public DeleteFile wrapped() { + return wrapped; + } + + public long applySequenceNumber() { + return applySequenceNumber; + } + + public List<Types.NestedField> equalityFields() { + if (equalityFields == null) { + synchronized (this) { + if (equalityFields == null) { + List<Types.NestedField> fields = Lists.newArrayList(); + for (int id : wrapped.equalityFieldIds()) { + Types.NestedField field = spec.schema().findField(id); + fields.add(field); + } + this.equalityFields = fields; + } + } + } + + return equalityFields; + } + + public Map<Integer, Long> valueCounts() { + return wrapped.valueCounts(); + } + + public Map<Integer, Long> nullValueCounts() { + return wrapped.nullValueCounts(); + } + + public boolean hasLowerAndUpperBounds() { + return wrapped.lowerBounds() != null && wrapped.upperBounds() != null; + } + + @SuppressWarnings("unchecked") + public <T> T lowerBound(int id) { + return (T) lowerBounds().get(id); + } + + private Map<Integer, Object> lowerBounds() { + if (convertedLowerBounds == null) { + synchronized (this) { + if (convertedLowerBounds == null) { + this.convertedLowerBounds = convertBounds(wrapped.lowerBounds()); + } + } + } + + return convertedLowerBounds; + } + + @SuppressWarnings("unchecked") + public <T> T upperBound(int id) { + return (T) upperBounds().get(id); + } + + private Map<Integer, Object> upperBounds() { + if (convertedUpperBounds == null) { + synchronized (this) { + if (convertedUpperBounds == null) { + this.convertedUpperBounds = convertBounds(wrapped.upperBounds()); + } + } + } + + return convertedUpperBounds; + } + + private Map<Integer, Object> convertBounds(Map<Integer, ByteBuffer> bounds) { + Map<Integer, Object> converted = Maps.newHashMap(); + + if (bounds != null) { + for (Types.NestedField field : equalityFields()) { + int id = field.fieldId(); + Type type = spec.schema().findField(id).type(); + if (type.isPrimitiveType()) { + ByteBuffer bound = bounds.get(id); + if (bound != null) { + converted.put(id, Conversions.fromByteBuffer(type, bound)); + } + } + } + } + + return converted; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
