gty404 commented on code in PR #682: URL: https://github.com/apache/iceberg-cpp/pull/682#discussion_r3308444383
########## src/iceberg/update/merging_snapshot_update.cc: ########## @@ -0,0 +1,870 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/merging_snapshot_update.h" + +#include <algorithm> +#include <span> +#include <unordered_map> +#include <vector> + +#include "iceberg/constants.h" +#include "iceberg/delete_file_index.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_util_internal.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name, + std::shared_ptr<TransactionContext> ctx) + : SnapshotUpdate(std::move(ctx)), + table_name_(std::move(table_name)), + delete_expression_(Expressions::AlwaysFalse()), + data_filter_manager_(ManifestContent::kData, ctx_->table->io()), + delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()), + data_merge_manager_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes), + base().properties.Get(TableProperties::kManifestMinMergeCount), + base().properties.Get(TableProperties::kManifestMergeEnabled)), + delete_merge_manager_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes), + base().properties.Get(TableProperties::kManifestMinMergeCount), + base().properties.Get(TableProperties::kManifestMergeEnabled)) {} + +// ------------------------------------------------------------------------- +// Primitive API +// ------------------------------------------------------------------------- + +Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot add a null data file"); + } + if (!file->partition_spec_id.has_value()) { + return InvalidArgument("Data file must have a partition spec ID"); + } + + int32_t spec_id = file->partition_spec_id.value(); + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + + // Suppress first_row_id — it will be assigned by the commit, not inherited from the + // source file. + file->first_row_id = std::nullopt; + + auto& data_files = new_data_files_by_spec_[spec_id]; + auto [it, inserted] = data_files.insert(file); + if (inserted) { + has_new_data_files_ = true; + ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, *file)); + } + return {}; +} + +Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) { + if (file.content == DataFile::Content::kData) { + return InvalidArgument("Expected a delete file but got a data file: {}", + file.file_path); + } + const int8_t format_version = base().format_version; + const bool is_dv = file.referenced_data_file.has_value(); + switch (format_version) { + case 1: + return InvalidArgument("Deletes are supported in V2 and above"); + case 2: + // Position deletes must NOT be DVs in v2. + if (file.content == DataFile::Content::kPositionDeletes && is_dv) { + return InvalidArgument("Must not use DVs for position deletes in V2: {}", + file.file_path); + } + break; + default: + if (format_version >= 3) { + // Position deletes MUST be DVs in v3+. + if (file.content == DataFile::Content::kPositionDeletes && !is_dv) { + return InvalidArgument("Must use DVs for position deletes in V{}: {}", + format_version, file.file_path); + } + } else { + return InvalidArgument("Unsupported format version: {}", format_version); + } + break; + } + return {}; +} + +Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot add a null delete file"); + } + ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file)); + if (!file->partition_spec_id.has_value()) { + return InvalidArgument("Delete file must have a partition spec ID"); + } + ICEBERG_ASSIGN_OR_RAISE(auto spec, + base().PartitionSpecById(file->partition_spec_id.value())); + ICEBERG_RETURN_UNEXPECTED(added_delete_files_summary_.AddedFile(*spec, *file)); + has_new_delete_files_ = true; + new_delete_files_.push_back(std::move(file)); + return {}; +} + +Status MergingSnapshotUpdate::DeleteDataFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot delete a null data file"); + } + return data_filter_manager_.DeleteFile(std::move(file)); +} + +Status MergingSnapshotUpdate::DeleteDeleteFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot delete a null delete file"); + } + return delete_filter_manager_.DeleteFile(std::move(file)); +} + +void MergingSnapshotUpdate::DeleteByPath(std::string_view path) { + data_filter_manager_.DeleteFile(path); +} + +Status MergingSnapshotUpdate::DeleteByRowFilter(std::shared_ptr<Expression> expr) { + // If a delete file matches the row filter, it can also be removed because the rows + // it references will also be deleted. Both filter managers receive the expression. + delete_expression_ = expr; + ICEBERG_RETURN_UNEXPECTED(data_filter_manager_.DeleteByRowFilter(expr)); + return delete_filter_manager_.DeleteByRowFilter(std::move(expr)); +} + +void MergingSnapshotUpdate::DropPartition(int32_t spec_id, PartitionValues partition) { + // Dropping data in a partition also drops all delete files in that partition. + data_filter_manager_.DropPartition(spec_id, partition); + delete_filter_manager_.DropPartition(spec_id, std::move(partition)); +} + +void MergingSnapshotUpdate::FailMissingDeletePaths() { + data_filter_manager_.FailMissingDeletePaths(); + delete_filter_manager_.FailMissingDeletePaths(); +} + +void MergingSnapshotUpdate::FailAnyDelete() { + data_filter_manager_.FailAnyDelete(); + delete_filter_manager_.FailAnyDelete(); +} + +void MergingSnapshotUpdate::SetNewDataFilesDataSequenceNumber(int64_t sequence_number) { + new_data_files_data_seq_number_ = sequence_number; +} + +void MergingSnapshotUpdate::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + data_filter_manager_.CaseSensitive(case_sensitive); + delete_filter_manager_.CaseSensitive(case_sensitive); +} + +void MergingSnapshotUpdate::Set(const std::string& property, const std::string& value) { + summary_builder().Set(property, value); +} + +Result<std::shared_ptr<PartitionSpec>> MergingSnapshotUpdate::DataSpec() const { + if (new_data_files_by_spec_.empty()) { + return InvalidArgument("DataSpec() called before any data file was added"); + } + if (new_data_files_by_spec_.size() > 1) { + return InvalidArgument( + "DataSpec() requires exactly one partition spec; got {} different specs", + new_data_files_by_spec_.size()); + } + return base().PartitionSpecById(new_data_files_by_spec_.begin()->first); +} + +std::vector<std::shared_ptr<DataFile>> MergingSnapshotUpdate::AddedDataFiles() const { + std::vector<std::shared_ptr<DataFile>> result; + for (const auto& [spec_id, files] : new_data_files_by_spec_) { + for (const auto& file : files) { + result.push_back(file); + } + } + return result; +} + +Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> /*file*/, + int64_t /*data_sequence_number*/) { + return NotImplemented( + "AddDeleteFile with explicit data sequence number is not yet implemented"); +} + +Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) { + if (manifest.content != ManifestContent::kData) { + return InvalidArgument("Cannot append delete manifest: {}", manifest.manifest_path); + } + if (manifest.has_existing_files()) { + return InvalidArgument("Cannot append manifest with existing files: {}", + manifest.manifest_path); + } + if (manifest.has_deleted_files()) { + return InvalidArgument("Cannot append manifest with deleted files: {}", + manifest.manifest_path); + } + if (manifest.added_snapshot_id != kInvalidSnapshotId) { Review Comment: Good catch. I'll stop rejecting manifests that already carry a snapshot id or first_row_id and route them through CopyManifest so the behavior matches Java ########## src/iceberg/manifest/manifest_filter_manager.cc: ########## @@ -117,12 +117,24 @@ bool ManifestFilterManager::ContainsDeletes() const { !drop_partitions_.empty(); } +void ManifestFilterManager::DropDeleteFilesOlderThan(int64_t sequence_number) { + min_sequence_number_ = sequence_number; +} + +void ManifestFilterManager::RemoveDanglingDeletesFor(const DataFileSet& deleted_files) { + for (const auto& file : deleted_files) { + removed_data_file_paths_.insert(file->file_path); + } +} + Result<bool> ManifestFilterManager::CanContainDroppedFiles(const ManifestFile&) const { // TODO(Guotao): Use the manifest descriptor to skip unrelated object-delete // manifests once object-delete partitions are tracked separately. // Currently, DeleteFile(std::shared_ptr<DataFile>) degrades to a path-based delete, // which forces scanning all manifests. - return !delete_paths_.empty(); + // Also open delete manifests when a minimum sequence number is set for cleanup. + return !delete_paths_.empty() || !removed_data_file_paths_.empty() || + (manifest_content_ == ManifestContent::kDeletes && min_sequence_number_ > 0); Review Comment: You're right. I'll remove the eager scan triggered solely by minSequenceNumber and only drop old deletes when a delete manifest is already being rewritten for filtering. ########## src/iceberg/manifest/manifest_merge_manager.cc: ########## @@ -140,6 +142,8 @@ Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup( } else { ICEBERG_ASSIGN_OR_RAISE( auto merged, FlushBin(bin, snapshot_id, metadata, file_io, writer_factory)); + // Each manifest consumed into the merged output (beyond the 1 output) is replaced. + replaced_manifests_count_ += static_cast<int32_t>(bin.size()) - 1; Review Comment: Good point. I'll update the replaced-manifest accounting to count the previous-snapshot manifests consumed by the merge instead of using bin.size() - 1. ########## src/iceberg/update/merging_snapshot_update.cc: ########## @@ -0,0 +1,870 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/merging_snapshot_update.h" + +#include <algorithm> +#include <span> +#include <unordered_map> +#include <vector> + +#include "iceberg/constants.h" +#include "iceberg/delete_file_index.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_util_internal.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name, + std::shared_ptr<TransactionContext> ctx) + : SnapshotUpdate(std::move(ctx)), + table_name_(std::move(table_name)), + delete_expression_(Expressions::AlwaysFalse()), + data_filter_manager_(ManifestContent::kData, ctx_->table->io()), + delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()), + data_merge_manager_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes), + base().properties.Get(TableProperties::kManifestMinMergeCount), + base().properties.Get(TableProperties::kManifestMergeEnabled)), + delete_merge_manager_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes), + base().properties.Get(TableProperties::kManifestMinMergeCount), + base().properties.Get(TableProperties::kManifestMergeEnabled)) {} + +// ------------------------------------------------------------------------- +// Primitive API +// ------------------------------------------------------------------------- + +Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot add a null data file"); + } + if (!file->partition_spec_id.has_value()) { + return InvalidArgument("Data file must have a partition spec ID"); + } + + int32_t spec_id = file->partition_spec_id.value(); + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + + // Suppress first_row_id — it will be assigned by the commit, not inherited from the + // source file. + file->first_row_id = std::nullopt; + + auto& data_files = new_data_files_by_spec_[spec_id]; + auto [it, inserted] = data_files.insert(file); + if (inserted) { + has_new_data_files_ = true; + ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, *file)); + } + return {}; +} + +Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) { + if (file.content == DataFile::Content::kData) { + return InvalidArgument("Expected a delete file but got a data file: {}", + file.file_path); + } + const int8_t format_version = base().format_version; + const bool is_dv = file.referenced_data_file.has_value(); + switch (format_version) { + case 1: + return InvalidArgument("Deletes are supported in V2 and above"); + case 2: + // Position deletes must NOT be DVs in v2. + if (file.content == DataFile::Content::kPositionDeletes && is_dv) { + return InvalidArgument("Must not use DVs for position deletes in V2: {}", + file.file_path); + } + break; + default: + if (format_version >= 3) { + // Position deletes MUST be DVs in v3+. + if (file.content == DataFile::Content::kPositionDeletes && !is_dv) { + return InvalidArgument("Must use DVs for position deletes in V{}: {}", + format_version, file.file_path); + } + } else { + return InvalidArgument("Unsupported format version: {}", format_version); + } + break; + } + return {}; +} + +Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot add a null delete file"); + } + ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file)); + if (!file->partition_spec_id.has_value()) { + return InvalidArgument("Delete file must have a partition spec ID"); + } + ICEBERG_ASSIGN_OR_RAISE(auto spec, + base().PartitionSpecById(file->partition_spec_id.value())); + ICEBERG_RETURN_UNEXPECTED(added_delete_files_summary_.AddedFile(*spec, *file)); + has_new_delete_files_ = true; + new_delete_files_.push_back(std::move(file)); + return {}; +} + +Status MergingSnapshotUpdate::DeleteDataFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot delete a null data file"); + } + return data_filter_manager_.DeleteFile(std::move(file)); +} + +Status MergingSnapshotUpdate::DeleteDeleteFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot delete a null delete file"); + } + return delete_filter_manager_.DeleteFile(std::move(file)); +} + +void MergingSnapshotUpdate::DeleteByPath(std::string_view path) { + data_filter_manager_.DeleteFile(path); +} + +Status MergingSnapshotUpdate::DeleteByRowFilter(std::shared_ptr<Expression> expr) { + // If a delete file matches the row filter, it can also be removed because the rows + // it references will also be deleted. Both filter managers receive the expression. + delete_expression_ = expr; + ICEBERG_RETURN_UNEXPECTED(data_filter_manager_.DeleteByRowFilter(expr)); + return delete_filter_manager_.DeleteByRowFilter(std::move(expr)); +} + +void MergingSnapshotUpdate::DropPartition(int32_t spec_id, PartitionValues partition) { + // Dropping data in a partition also drops all delete files in that partition. + data_filter_manager_.DropPartition(spec_id, partition); + delete_filter_manager_.DropPartition(spec_id, std::move(partition)); +} + +void MergingSnapshotUpdate::FailMissingDeletePaths() { + data_filter_manager_.FailMissingDeletePaths(); + delete_filter_manager_.FailMissingDeletePaths(); +} + +void MergingSnapshotUpdate::FailAnyDelete() { + data_filter_manager_.FailAnyDelete(); + delete_filter_manager_.FailAnyDelete(); +} + +void MergingSnapshotUpdate::SetNewDataFilesDataSequenceNumber(int64_t sequence_number) { + new_data_files_data_seq_number_ = sequence_number; +} + +void MergingSnapshotUpdate::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + data_filter_manager_.CaseSensitive(case_sensitive); + delete_filter_manager_.CaseSensitive(case_sensitive); +} + +void MergingSnapshotUpdate::Set(const std::string& property, const std::string& value) { + summary_builder().Set(property, value); +} + +Result<std::shared_ptr<PartitionSpec>> MergingSnapshotUpdate::DataSpec() const { + if (new_data_files_by_spec_.empty()) { + return InvalidArgument("DataSpec() called before any data file was added"); + } + if (new_data_files_by_spec_.size() > 1) { + return InvalidArgument( + "DataSpec() requires exactly one partition spec; got {} different specs", + new_data_files_by_spec_.size()); + } + return base().PartitionSpecById(new_data_files_by_spec_.begin()->first); +} + +std::vector<std::shared_ptr<DataFile>> MergingSnapshotUpdate::AddedDataFiles() const { + std::vector<std::shared_ptr<DataFile>> result; + for (const auto& [spec_id, files] : new_data_files_by_spec_) { + for (const auto& file : files) { + result.push_back(file); + } + } + return result; +} + +Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> /*file*/, + int64_t /*data_sequence_number*/) { + return NotImplemented( + "AddDeleteFile with explicit data sequence number is not yet implemented"); +} + +Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) { + if (manifest.content != ManifestContent::kData) { + return InvalidArgument("Cannot append delete manifest: {}", manifest.manifest_path); + } + if (manifest.has_existing_files()) { + return InvalidArgument("Cannot append manifest with existing files: {}", + manifest.manifest_path); + } + if (manifest.has_deleted_files()) { + return InvalidArgument("Cannot append manifest with deleted files: {}", + manifest.manifest_path); + } + if (manifest.added_snapshot_id != kInvalidSnapshotId) { + return InvalidArgument("Snapshot id must be assigned during commit: {}", + manifest.manifest_path); + } + if (manifest.first_row_id.has_value()) { + return InvalidArgument("Cannot append manifest with assigned first_row_id: {}", + manifest.manifest_path); + } + + if (can_inherit_snapshot_id()) { + appended_manifests_summary_.AddedManifest(manifest); + append_manifests_.push_back(std::move(manifest)); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest)); + rewritten_append_manifests_.push_back(std::move(copied)); + } + return {}; +} + +Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& manifest) { + const TableMetadata& current = base(); + ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + current.PartitionSpecById(manifest.partition_spec_id)); + std::string path = ManifestPath(); + all_written_manifests_.insert(path); + return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, SnapshotId(), path, + current.format_version, &appended_manifests_summary_); +} + +// ------------------------------------------------------------------------- +// State queries +// ------------------------------------------------------------------------- + +bool MergingSnapshotUpdate::AddsDataFiles() const { + return !new_data_files_by_spec_.empty(); +} + +bool MergingSnapshotUpdate::AddsDeleteFiles() const { return !new_delete_files_.empty(); } + +bool MergingSnapshotUpdate::DeletesDataFiles() const { + return data_filter_manager_.ContainsDeletes(); +} + +bool MergingSnapshotUpdate::DeletesDeleteFiles() const { + return delete_filter_manager_.ContainsDeletes(); +} + +// ------------------------------------------------------------------------- +// Apply pipeline +// ------------------------------------------------------------------------- + +ManifestWriterFactory MergingSnapshotUpdate::MakeTrackedWriterFactory() { + return [this](int32_t spec_id, + ManifestContent content) -> Result<std::unique_ptr<ManifestWriter>> { + const TableMetadata& meta = base(); + ICEBERG_ASSIGN_OR_RAISE(auto schema, meta.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, meta.PartitionSpecById(spec_id)); + std::string path = ManifestPath(); + all_written_manifests_.insert(path); + return ManifestWriter::MakeWriter(meta.format_version, SnapshotId(), std::move(path), + ctx_->table->io(), std::move(spec), + std::move(schema), content); + }; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::WriteNewDataManifests() { + // If new files were staged after the cache was populated (commit retry), invalidate. + if (has_new_data_files_ && cached_new_data_manifests_.has_value()) { + for (const auto& m : *cached_new_data_manifests_) { + std::ignore = DeleteFile(m.manifest_path); + } + cached_new_data_manifests_.reset(); + } + + if (cached_new_data_manifests_.has_value()) { + return *cached_new_data_manifests_; + } + + std::vector<ManifestFile> result; + for (const auto& [spec_id, data_files] : new_data_files_by_spec_) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE( + auto written, + WriteDataManifests(data_files.as_span(), spec, new_data_files_data_seq_number_)); + for (const auto& m : written) { + all_written_manifests_.insert(m.manifest_path); + } + result.insert(result.end(), std::make_move_iterator(written.begin()), + std::make_move_iterator(written.end())); + } + + cached_new_data_manifests_ = result; + has_new_data_files_ = false; + return result; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::WriteNewDeleteManifests() { + // If new files were staged after the cache was populated (commit retry), invalidate. + if (has_new_delete_files_ && cached_new_delete_manifests_.has_value()) { + for (const auto& m : *cached_new_delete_manifests_) { + std::ignore = DeleteFile(m.manifest_path); + } + cached_new_delete_manifests_.reset(); + } + + if (cached_new_delete_manifests_.has_value()) { + return *cached_new_delete_manifests_; + } + + // Group delete files by partition spec ID, mirroring WriteNewDataManifests(). + std::unordered_map<int32_t, std::vector<std::shared_ptr<DataFile>>> + delete_files_by_spec; + for (const auto& file : new_delete_files_) { + delete_files_by_spec[file->partition_spec_id.value()].push_back(file); + } + + std::vector<ManifestFile> result; + for (const auto& [spec_id, delete_files] : delete_files_by_spec) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto written, + WriteDeleteManifests(std::span(delete_files), spec)); + for (const auto& m : written) { + all_written_manifests_.insert(m.manifest_path); + } + result.insert(result.end(), std::make_move_iterator(written.begin()), + std::make_move_iterator(written.end())); + } + + cached_new_delete_manifests_ = result; + has_new_delete_files_ = false; + return result; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::Apply( + const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& snapshot) { + // Re-validate buffered delete files against the current format version. A format + // upgrade between staging and commit could make previously-valid files invalid. + for (const auto& file : new_delete_files_) { + ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file)); + } + + // Rebuild summary from stable sub-builders so that commit retries don't double-count. + summary_builder().Clear(); + summary_builder().Merge(added_data_files_summary_); + summary_builder().Merge(added_delete_files_summary_); + summary_builder().Merge(appended_manifests_summary_); + + auto tracked_factory = MakeTrackedWriterFactory(); + + // Step 1: Filter data manifests. + ICEBERG_ASSIGN_OR_RAISE(auto filtered_data, + data_filter_manager_.FilterManifests( Review Comment: Agreed. I'll switch this path to use the schema for the target branch instead of the current table schema so filter evaluation stays correct on non-current branches. ########## src/iceberg/update/merging_snapshot_update.cc: ########## @@ -0,0 +1,870 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/merging_snapshot_update.h" + +#include <algorithm> +#include <span> +#include <unordered_map> +#include <vector> + +#include "iceberg/constants.h" +#include "iceberg/delete_file_index.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_util_internal.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name, + std::shared_ptr<TransactionContext> ctx) + : SnapshotUpdate(std::move(ctx)), + table_name_(std::move(table_name)), + delete_expression_(Expressions::AlwaysFalse()), + data_filter_manager_(ManifestContent::kData, ctx_->table->io()), + delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()), + data_merge_manager_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes), + base().properties.Get(TableProperties::kManifestMinMergeCount), + base().properties.Get(TableProperties::kManifestMergeEnabled)), + delete_merge_manager_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes), + base().properties.Get(TableProperties::kManifestMinMergeCount), + base().properties.Get(TableProperties::kManifestMergeEnabled)) {} + +// ------------------------------------------------------------------------- +// Primitive API +// ------------------------------------------------------------------------- + +Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot add a null data file"); + } + if (!file->partition_spec_id.has_value()) { + return InvalidArgument("Data file must have a partition spec ID"); + } + + int32_t spec_id = file->partition_spec_id.value(); + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + + // Suppress first_row_id — it will be assigned by the commit, not inherited from the + // source file. + file->first_row_id = std::nullopt; + + auto& data_files = new_data_files_by_spec_[spec_id]; + auto [it, inserted] = data_files.insert(file); + if (inserted) { + has_new_data_files_ = true; + ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, *file)); + } + return {}; +} + +Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) { + if (file.content == DataFile::Content::kData) { + return InvalidArgument("Expected a delete file but got a data file: {}", + file.file_path); + } + const int8_t format_version = base().format_version; + const bool is_dv = file.referenced_data_file.has_value(); + switch (format_version) { + case 1: + return InvalidArgument("Deletes are supported in V2 and above"); + case 2: + // Position deletes must NOT be DVs in v2. + if (file.content == DataFile::Content::kPositionDeletes && is_dv) { + return InvalidArgument("Must not use DVs for position deletes in V2: {}", + file.file_path); + } + break; + default: + if (format_version >= 3) { + // Position deletes MUST be DVs in v3+. + if (file.content == DataFile::Content::kPositionDeletes && !is_dv) { + return InvalidArgument("Must use DVs for position deletes in V{}: {}", + format_version, file.file_path); + } + } else { + return InvalidArgument("Unsupported format version: {}", format_version); + } + break; + } + return {}; +} + +Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot add a null delete file"); + } + ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file)); + if (!file->partition_spec_id.has_value()) { + return InvalidArgument("Delete file must have a partition spec ID"); + } + ICEBERG_ASSIGN_OR_RAISE(auto spec, + base().PartitionSpecById(file->partition_spec_id.value())); + ICEBERG_RETURN_UNEXPECTED(added_delete_files_summary_.AddedFile(*spec, *file)); + has_new_delete_files_ = true; + new_delete_files_.push_back(std::move(file)); + return {}; +} + +Status MergingSnapshotUpdate::DeleteDataFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot delete a null data file"); + } + return data_filter_manager_.DeleteFile(std::move(file)); +} + +Status MergingSnapshotUpdate::DeleteDeleteFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot delete a null delete file"); + } + return delete_filter_manager_.DeleteFile(std::move(file)); +} + +void MergingSnapshotUpdate::DeleteByPath(std::string_view path) { + data_filter_manager_.DeleteFile(path); +} + +Status MergingSnapshotUpdate::DeleteByRowFilter(std::shared_ptr<Expression> expr) { + // If a delete file matches the row filter, it can also be removed because the rows + // it references will also be deleted. Both filter managers receive the expression. + delete_expression_ = expr; + ICEBERG_RETURN_UNEXPECTED(data_filter_manager_.DeleteByRowFilter(expr)); + return delete_filter_manager_.DeleteByRowFilter(std::move(expr)); +} + +void MergingSnapshotUpdate::DropPartition(int32_t spec_id, PartitionValues partition) { + // Dropping data in a partition also drops all delete files in that partition. + data_filter_manager_.DropPartition(spec_id, partition); + delete_filter_manager_.DropPartition(spec_id, std::move(partition)); +} + +void MergingSnapshotUpdate::FailMissingDeletePaths() { + data_filter_manager_.FailMissingDeletePaths(); + delete_filter_manager_.FailMissingDeletePaths(); +} + +void MergingSnapshotUpdate::FailAnyDelete() { + data_filter_manager_.FailAnyDelete(); + delete_filter_manager_.FailAnyDelete(); +} + +void MergingSnapshotUpdate::SetNewDataFilesDataSequenceNumber(int64_t sequence_number) { + new_data_files_data_seq_number_ = sequence_number; +} + +void MergingSnapshotUpdate::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + data_filter_manager_.CaseSensitive(case_sensitive); + delete_filter_manager_.CaseSensitive(case_sensitive); +} + +void MergingSnapshotUpdate::Set(const std::string& property, const std::string& value) { + summary_builder().Set(property, value); +} + +Result<std::shared_ptr<PartitionSpec>> MergingSnapshotUpdate::DataSpec() const { + if (new_data_files_by_spec_.empty()) { + return InvalidArgument("DataSpec() called before any data file was added"); + } + if (new_data_files_by_spec_.size() > 1) { + return InvalidArgument( + "DataSpec() requires exactly one partition spec; got {} different specs", + new_data_files_by_spec_.size()); + } + return base().PartitionSpecById(new_data_files_by_spec_.begin()->first); +} + +std::vector<std::shared_ptr<DataFile>> MergingSnapshotUpdate::AddedDataFiles() const { + std::vector<std::shared_ptr<DataFile>> result; + for (const auto& [spec_id, files] : new_data_files_by_spec_) { + for (const auto& file : files) { + result.push_back(file); + } + } + return result; +} + +Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> /*file*/, + int64_t /*data_sequence_number*/) { + return NotImplemented( + "AddDeleteFile with explicit data sequence number is not yet implemented"); +} + +Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) { + if (manifest.content != ManifestContent::kData) { + return InvalidArgument("Cannot append delete manifest: {}", manifest.manifest_path); + } + if (manifest.has_existing_files()) { + return InvalidArgument("Cannot append manifest with existing files: {}", + manifest.manifest_path); + } + if (manifest.has_deleted_files()) { + return InvalidArgument("Cannot append manifest with deleted files: {}", + manifest.manifest_path); + } + if (manifest.added_snapshot_id != kInvalidSnapshotId) { + return InvalidArgument("Snapshot id must be assigned during commit: {}", + manifest.manifest_path); + } + if (manifest.first_row_id.has_value()) { + return InvalidArgument("Cannot append manifest with assigned first_row_id: {}", + manifest.manifest_path); + } + + if (can_inherit_snapshot_id()) { + appended_manifests_summary_.AddedManifest(manifest); + append_manifests_.push_back(std::move(manifest)); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest)); + rewritten_append_manifests_.push_back(std::move(copied)); + } + return {}; +} + +Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& manifest) { + const TableMetadata& current = base(); + ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + current.PartitionSpecById(manifest.partition_spec_id)); + std::string path = ManifestPath(); + all_written_manifests_.insert(path); + return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, SnapshotId(), path, + current.format_version, &appended_manifests_summary_); +} + +// ------------------------------------------------------------------------- +// State queries +// ------------------------------------------------------------------------- + +bool MergingSnapshotUpdate::AddsDataFiles() const { + return !new_data_files_by_spec_.empty(); +} + +bool MergingSnapshotUpdate::AddsDeleteFiles() const { return !new_delete_files_.empty(); } + +bool MergingSnapshotUpdate::DeletesDataFiles() const { + return data_filter_manager_.ContainsDeletes(); +} + +bool MergingSnapshotUpdate::DeletesDeleteFiles() const { + return delete_filter_manager_.ContainsDeletes(); +} + +// ------------------------------------------------------------------------- +// Apply pipeline +// ------------------------------------------------------------------------- + +ManifestWriterFactory MergingSnapshotUpdate::MakeTrackedWriterFactory() { + return [this](int32_t spec_id, + ManifestContent content) -> Result<std::unique_ptr<ManifestWriter>> { + const TableMetadata& meta = base(); + ICEBERG_ASSIGN_OR_RAISE(auto schema, meta.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, meta.PartitionSpecById(spec_id)); + std::string path = ManifestPath(); + all_written_manifests_.insert(path); + return ManifestWriter::MakeWriter(meta.format_version, SnapshotId(), std::move(path), + ctx_->table->io(), std::move(spec), + std::move(schema), content); + }; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::WriteNewDataManifests() { + // If new files were staged after the cache was populated (commit retry), invalidate. + if (has_new_data_files_ && cached_new_data_manifests_.has_value()) { + for (const auto& m : *cached_new_data_manifests_) { + std::ignore = DeleteFile(m.manifest_path); + } + cached_new_data_manifests_.reset(); + } + + if (cached_new_data_manifests_.has_value()) { + return *cached_new_data_manifests_; + } + + std::vector<ManifestFile> result; + for (const auto& [spec_id, data_files] : new_data_files_by_spec_) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE( + auto written, + WriteDataManifests(data_files.as_span(), spec, new_data_files_data_seq_number_)); + for (const auto& m : written) { + all_written_manifests_.insert(m.manifest_path); + } + result.insert(result.end(), std::make_move_iterator(written.begin()), + std::make_move_iterator(written.end())); + } + + cached_new_data_manifests_ = result; + has_new_data_files_ = false; + return result; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::WriteNewDeleteManifests() { + // If new files were staged after the cache was populated (commit retry), invalidate. + if (has_new_delete_files_ && cached_new_delete_manifests_.has_value()) { + for (const auto& m : *cached_new_delete_manifests_) { + std::ignore = DeleteFile(m.manifest_path); + } + cached_new_delete_manifests_.reset(); + } + + if (cached_new_delete_manifests_.has_value()) { + return *cached_new_delete_manifests_; + } + + // Group delete files by partition spec ID, mirroring WriteNewDataManifests(). + std::unordered_map<int32_t, std::vector<std::shared_ptr<DataFile>>> + delete_files_by_spec; + for (const auto& file : new_delete_files_) { + delete_files_by_spec[file->partition_spec_id.value()].push_back(file); + } + + std::vector<ManifestFile> result; + for (const auto& [spec_id, delete_files] : delete_files_by_spec) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto written, + WriteDeleteManifests(std::span(delete_files), spec)); + for (const auto& m : written) { + all_written_manifests_.insert(m.manifest_path); + } + result.insert(result.end(), std::make_move_iterator(written.begin()), + std::make_move_iterator(written.end())); + } + + cached_new_delete_manifests_ = result; + has_new_delete_files_ = false; + return result; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::Apply( + const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& snapshot) { + // Re-validate buffered delete files against the current format version. A format + // upgrade between staging and commit could make previously-valid files invalid. + for (const auto& file : new_delete_files_) { + ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file)); + } + + // Rebuild summary from stable sub-builders so that commit retries don't double-count. + summary_builder().Clear(); + summary_builder().Merge(added_data_files_summary_); + summary_builder().Merge(added_delete_files_summary_); + summary_builder().Merge(appended_manifests_summary_); + + auto tracked_factory = MakeTrackedWriterFactory(); + + // Step 1: Filter data manifests. + ICEBERG_ASSIGN_OR_RAISE(auto filtered_data, + data_filter_manager_.FilterManifests( + metadata_to_update, snapshot, tracked_factory)); + + // Track deleted data files in the summary builder. + for (const auto& file : data_filter_manager_.FilesToBeDeleted()) { + if (!file->partition_spec_id.has_value()) { + continue; + } + ICEBERG_ASSIGN_OR_RAISE( + auto spec, metadata_to_update.PartitionSpecById(*file->partition_spec_id)); + ICEBERG_RETURN_UNEXPECTED(summary_builder().DeletedFile(*spec, *file)); + } + + // Step 2: Compute min data sequence number; set up delete filter cleanup. + // Use last_sequence_number as the initial value so that an empty filtered list + // produces a sensible minimum. Skip manifests with kUnassignedSequenceNumber — + // those are manifests written in the current Apply() call whose sequence number + // hasn't been assigned yet. If all filtered manifests are unassigned (e.g. the + // table has no pre-existing data manifests), the fallback to last_sequence_number + // is safe: any delete file with seq > 0 and seq <= last_sequence_number can no + // longer match live data rows, so cleaning them up is correct. + int64_t min_data_seq = metadata_to_update.last_sequence_number; + for (const auto& manifest : filtered_data) { + if (manifest.min_sequence_number != kUnassignedSequenceNumber) { + min_data_seq = std::min(min_data_seq, manifest.min_sequence_number); + } + } + delete_filter_manager_.DropDeleteFilesOlderThan(min_data_seq); + delete_filter_manager_.RemoveDanglingDeletesFor( + data_filter_manager_.FilesToBeDeleted()); + + // Step 3: Filter delete manifests. + ICEBERG_ASSIGN_OR_RAISE(auto filtered_deletes, + delete_filter_manager_.FilterManifests( + metadata_to_update, snapshot, tracked_factory)); + + // Track deleted delete files in the summary builder. + for (const auto& file : delete_filter_manager_.FilesToBeDeleted()) { + if (!file->partition_spec_id.has_value()) { + continue; + } + ICEBERG_ASSIGN_OR_RAISE( + auto spec, metadata_to_update.PartitionSpecById(*file->partition_spec_id)); + ICEBERG_RETURN_UNEXPECTED(summary_builder().DeletedFile(*spec, *file)); + } + + // Drop manifests with no live files — they carry no data and should not be merged + // into the new snapshot. Manifests written by the current snapshot are always kept + // regardless of live-file counts; the merge stage handles any that are empty. + int64_t snapshot_id = SnapshotId(); + auto should_keep = [snapshot_id](const ManifestFile& m) { + return m.has_added_files() || m.has_existing_files() || + m.added_snapshot_id == snapshot_id; + }; + std::erase_if(filtered_data, [&](const ManifestFile& m) { return !should_keep(m); }); + std::erase_if(filtered_deletes, [&](const ManifestFile& m) { return !should_keep(m); }); + + // Step 4: Write (or retrieve cached) new data manifests. + ICEBERG_ASSIGN_OR_RAISE(auto written_data_manifests, WriteNewDataManifests()); + + // Incorporate append manifests (from AddManifest), stamping each with the + // current snapshot ID. append_manifests_ are used directly (inherit path); + // rewritten_append_manifests_ were already copied with the snapshot ID. + std::vector<ManifestFile> new_data_manifests = std::move(written_data_manifests); + for (const auto& src : append_manifests_) { + ManifestFile m = src; + m.added_snapshot_id = snapshot_id; + new_data_manifests.push_back(std::move(m)); + } + for (const auto& src : rewritten_append_manifests_) { + ManifestFile m = src; + m.added_snapshot_id = snapshot_id; + new_data_manifests.push_back(std::move(m)); + } + + // Step 5: Write (or retrieve cached) new delete manifests. + ICEBERG_ASSIGN_OR_RAISE(auto new_delete_manifests, WriteNewDeleteManifests()); + + // Step 6: Merge data manifests. + ICEBERG_ASSIGN_OR_RAISE(auto merged_data, + data_merge_manager_.MergeManifests( + filtered_data, new_data_manifests, SnapshotId(), + metadata_to_update, ctx_->table->io(), tracked_factory)); + + // Step 7: Merge delete manifests. + ICEBERG_ASSIGN_OR_RAISE(auto merged_deletes, + delete_merge_manager_.MergeManifests( + filtered_deletes, new_delete_manifests, SnapshotId(), + metadata_to_update, ctx_->table->io(), tracked_factory)); + + std::vector<ManifestFile> result; + result.reserve(merged_data.size() + merged_deletes.size()); + result.insert(result.end(), std::make_move_iterator(merged_data.begin()), + std::make_move_iterator(merged_data.end())); + result.insert(result.end(), std::make_move_iterator(merged_deletes.begin()), + std::make_move_iterator(merged_deletes.end())); + + // Manifest count summary. + int32_t manifests_created = 0; + int32_t manifests_kept = 0; + for (const auto& m : result) { + if (m.added_snapshot_id == snapshot_id) { + ++manifests_created; + } else { + ++manifests_kept; + } + } + int32_t replaced_manifests_count = data_filter_manager_.ReplacedManifestsCount() + + delete_filter_manager_.ReplacedManifestsCount() + + data_merge_manager_.ReplacedManifestsCount() + + delete_merge_manager_.ReplacedManifestsCount(); + summary_builder().SetManifestCounts(manifests_created, manifests_kept, + replaced_manifests_count); + + return result; +} + +void MergingSnapshotUpdate::CleanUncommitted( + const std::unordered_set<std::string>& committed) { + for (const auto& path : all_written_manifests_) { + if (!committed.contains(path)) { + std::ignore = DeleteFile(path); + } + } + all_written_manifests_.clear(); + cached_new_data_manifests_.reset(); + cached_new_delete_manifests_.reset(); + has_new_data_files_ = false; + has_new_delete_files_ = false; + + // rewritten_append_manifests_ are always owned by the table (copied by us), + // so delete any that were not committed. + for (const auto& m : rewritten_append_manifests_) { + if (!committed.contains(m.manifest_path)) { + std::ignore = DeleteFile(m.manifest_path); + } + } + + // append_manifests_ are only owned by the table if the commit succeeded + // (i.e., at least one manifest was committed). + if (!committed.empty()) { + for (const auto& m : append_manifests_) { + if (!committed.contains(m.manifest_path)) { + std::ignore = DeleteFile(m.manifest_path); + } + } + } +} + +std::unordered_map<std::string, std::string> MergingSnapshotUpdate::Summary() { + summary_builder().SetPartitionSummaryLimit( + base().properties.Get(TableProperties::kWritePartitionSummaryLimit)); + return summary_builder().Build(); +} + +// ------------------------------------------------------------------------- +// Conflict-detection helpers +// ------------------------------------------------------------------------- + +Status MergingSnapshotUpdate::ValidateAddedDataFiles( + const TableMetadata& metadata, int64_t starting_snapshot_id, + std::shared_ptr<Expression> filter, const std::shared_ptr<Snapshot>& parent, + std::shared_ptr<FileIO> io, bool case_sensitive) { + if (parent == nullptr) { + return {}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, Review Comment: Agreed. I'll make these validation paths fail when the starting snapshot is not reachable instead of silently validating a truncated ancestry range. ########## src/iceberg/update/merging_snapshot_update.cc: ########## @@ -0,0 +1,870 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/merging_snapshot_update.h" + +#include <algorithm> +#include <span> +#include <unordered_map> +#include <vector> + +#include "iceberg/constants.h" +#include "iceberg/delete_file_index.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/inclusive_metrics_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_util_internal.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name, + std::shared_ptr<TransactionContext> ctx) + : SnapshotUpdate(std::move(ctx)), + table_name_(std::move(table_name)), + delete_expression_(Expressions::AlwaysFalse()), + data_filter_manager_(ManifestContent::kData, ctx_->table->io()), + delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()), + data_merge_manager_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes), + base().properties.Get(TableProperties::kManifestMinMergeCount), + base().properties.Get(TableProperties::kManifestMergeEnabled)), + delete_merge_manager_( + base().properties.Get(TableProperties::kManifestTargetSizeBytes), + base().properties.Get(TableProperties::kManifestMinMergeCount), + base().properties.Get(TableProperties::kManifestMergeEnabled)) {} + +// ------------------------------------------------------------------------- +// Primitive API +// ------------------------------------------------------------------------- + +Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot add a null data file"); + } + if (!file->partition_spec_id.has_value()) { + return InvalidArgument("Data file must have a partition spec ID"); + } + + int32_t spec_id = file->partition_spec_id.value(); + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + + // Suppress first_row_id — it will be assigned by the commit, not inherited from the + // source file. + file->first_row_id = std::nullopt; + + auto& data_files = new_data_files_by_spec_[spec_id]; + auto [it, inserted] = data_files.insert(file); + if (inserted) { + has_new_data_files_ = true; + ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, *file)); + } + return {}; +} + +Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) { + if (file.content == DataFile::Content::kData) { + return InvalidArgument("Expected a delete file but got a data file: {}", + file.file_path); + } + const int8_t format_version = base().format_version; + const bool is_dv = file.referenced_data_file.has_value(); + switch (format_version) { + case 1: + return InvalidArgument("Deletes are supported in V2 and above"); + case 2: + // Position deletes must NOT be DVs in v2. + if (file.content == DataFile::Content::kPositionDeletes && is_dv) { + return InvalidArgument("Must not use DVs for position deletes in V2: {}", + file.file_path); + } + break; + default: + if (format_version >= 3) { + // Position deletes MUST be DVs in v3+. + if (file.content == DataFile::Content::kPositionDeletes && !is_dv) { + return InvalidArgument("Must use DVs for position deletes in V{}: {}", + format_version, file.file_path); + } + } else { + return InvalidArgument("Unsupported format version: {}", format_version); + } + break; + } + return {}; +} + +Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot add a null delete file"); + } + ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file)); + if (!file->partition_spec_id.has_value()) { + return InvalidArgument("Delete file must have a partition spec ID"); + } + ICEBERG_ASSIGN_OR_RAISE(auto spec, + base().PartitionSpecById(file->partition_spec_id.value())); + ICEBERG_RETURN_UNEXPECTED(added_delete_files_summary_.AddedFile(*spec, *file)); + has_new_delete_files_ = true; + new_delete_files_.push_back(std::move(file)); + return {}; +} + +Status MergingSnapshotUpdate::DeleteDataFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot delete a null data file"); + } + return data_filter_manager_.DeleteFile(std::move(file)); +} + +Status MergingSnapshotUpdate::DeleteDeleteFile(std::shared_ptr<DataFile> file) { + if (!file) { + return InvalidArgument("Cannot delete a null delete file"); + } + return delete_filter_manager_.DeleteFile(std::move(file)); +} + +void MergingSnapshotUpdate::DeleteByPath(std::string_view path) { + data_filter_manager_.DeleteFile(path); +} + +Status MergingSnapshotUpdate::DeleteByRowFilter(std::shared_ptr<Expression> expr) { + // If a delete file matches the row filter, it can also be removed because the rows + // it references will also be deleted. Both filter managers receive the expression. + delete_expression_ = expr; + ICEBERG_RETURN_UNEXPECTED(data_filter_manager_.DeleteByRowFilter(expr)); + return delete_filter_manager_.DeleteByRowFilter(std::move(expr)); +} + +void MergingSnapshotUpdate::DropPartition(int32_t spec_id, PartitionValues partition) { + // Dropping data in a partition also drops all delete files in that partition. + data_filter_manager_.DropPartition(spec_id, partition); + delete_filter_manager_.DropPartition(spec_id, std::move(partition)); +} + +void MergingSnapshotUpdate::FailMissingDeletePaths() { + data_filter_manager_.FailMissingDeletePaths(); + delete_filter_manager_.FailMissingDeletePaths(); +} + +void MergingSnapshotUpdate::FailAnyDelete() { + data_filter_manager_.FailAnyDelete(); + delete_filter_manager_.FailAnyDelete(); +} + +void MergingSnapshotUpdate::SetNewDataFilesDataSequenceNumber(int64_t sequence_number) { + new_data_files_data_seq_number_ = sequence_number; +} + +void MergingSnapshotUpdate::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + data_filter_manager_.CaseSensitive(case_sensitive); + delete_filter_manager_.CaseSensitive(case_sensitive); +} + +void MergingSnapshotUpdate::Set(const std::string& property, const std::string& value) { + summary_builder().Set(property, value); +} + +Result<std::shared_ptr<PartitionSpec>> MergingSnapshotUpdate::DataSpec() const { + if (new_data_files_by_spec_.empty()) { + return InvalidArgument("DataSpec() called before any data file was added"); + } + if (new_data_files_by_spec_.size() > 1) { + return InvalidArgument( + "DataSpec() requires exactly one partition spec; got {} different specs", + new_data_files_by_spec_.size()); + } + return base().PartitionSpecById(new_data_files_by_spec_.begin()->first); +} + +std::vector<std::shared_ptr<DataFile>> MergingSnapshotUpdate::AddedDataFiles() const { + std::vector<std::shared_ptr<DataFile>> result; + for (const auto& [spec_id, files] : new_data_files_by_spec_) { + for (const auto& file : files) { + result.push_back(file); + } + } + return result; +} + +Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> /*file*/, + int64_t /*data_sequence_number*/) { + return NotImplemented( + "AddDeleteFile with explicit data sequence number is not yet implemented"); +} + +Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) { + if (manifest.content != ManifestContent::kData) { + return InvalidArgument("Cannot append delete manifest: {}", manifest.manifest_path); + } + if (manifest.has_existing_files()) { + return InvalidArgument("Cannot append manifest with existing files: {}", + manifest.manifest_path); + } + if (manifest.has_deleted_files()) { + return InvalidArgument("Cannot append manifest with deleted files: {}", + manifest.manifest_path); + } + if (manifest.added_snapshot_id != kInvalidSnapshotId) { + return InvalidArgument("Snapshot id must be assigned during commit: {}", + manifest.manifest_path); + } + if (manifest.first_row_id.has_value()) { + return InvalidArgument("Cannot append manifest with assigned first_row_id: {}", + manifest.manifest_path); + } + + if (can_inherit_snapshot_id()) { + appended_manifests_summary_.AddedManifest(manifest); + append_manifests_.push_back(std::move(manifest)); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest)); + rewritten_append_manifests_.push_back(std::move(copied)); + } + return {}; +} + +Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& manifest) { + const TableMetadata& current = base(); + ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + current.PartitionSpecById(manifest.partition_spec_id)); + std::string path = ManifestPath(); + all_written_manifests_.insert(path); + return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, SnapshotId(), path, + current.format_version, &appended_manifests_summary_); +} + +// ------------------------------------------------------------------------- +// State queries +// ------------------------------------------------------------------------- + +bool MergingSnapshotUpdate::AddsDataFiles() const { + return !new_data_files_by_spec_.empty(); +} + +bool MergingSnapshotUpdate::AddsDeleteFiles() const { return !new_delete_files_.empty(); } + +bool MergingSnapshotUpdate::DeletesDataFiles() const { + return data_filter_manager_.ContainsDeletes(); +} + +bool MergingSnapshotUpdate::DeletesDeleteFiles() const { + return delete_filter_manager_.ContainsDeletes(); +} + +// ------------------------------------------------------------------------- +// Apply pipeline +// ------------------------------------------------------------------------- + +ManifestWriterFactory MergingSnapshotUpdate::MakeTrackedWriterFactory() { + return [this](int32_t spec_id, + ManifestContent content) -> Result<std::unique_ptr<ManifestWriter>> { + const TableMetadata& meta = base(); + ICEBERG_ASSIGN_OR_RAISE(auto schema, meta.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, meta.PartitionSpecById(spec_id)); + std::string path = ManifestPath(); + all_written_manifests_.insert(path); + return ManifestWriter::MakeWriter(meta.format_version, SnapshotId(), std::move(path), + ctx_->table->io(), std::move(spec), + std::move(schema), content); + }; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::WriteNewDataManifests() { + // If new files were staged after the cache was populated (commit retry), invalidate. + if (has_new_data_files_ && cached_new_data_manifests_.has_value()) { + for (const auto& m : *cached_new_data_manifests_) { + std::ignore = DeleteFile(m.manifest_path); + } + cached_new_data_manifests_.reset(); + } + + if (cached_new_data_manifests_.has_value()) { + return *cached_new_data_manifests_; + } + + std::vector<ManifestFile> result; + for (const auto& [spec_id, data_files] : new_data_files_by_spec_) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE( + auto written, + WriteDataManifests(data_files.as_span(), spec, new_data_files_data_seq_number_)); + for (const auto& m : written) { + all_written_manifests_.insert(m.manifest_path); + } + result.insert(result.end(), std::make_move_iterator(written.begin()), + std::make_move_iterator(written.end())); + } + + cached_new_data_manifests_ = result; + has_new_data_files_ = false; + return result; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::WriteNewDeleteManifests() { + // If new files were staged after the cache was populated (commit retry), invalidate. + if (has_new_delete_files_ && cached_new_delete_manifests_.has_value()) { + for (const auto& m : *cached_new_delete_manifests_) { + std::ignore = DeleteFile(m.manifest_path); + } + cached_new_delete_manifests_.reset(); + } + + if (cached_new_delete_manifests_.has_value()) { + return *cached_new_delete_manifests_; + } + + // Group delete files by partition spec ID, mirroring WriteNewDataManifests(). + std::unordered_map<int32_t, std::vector<std::shared_ptr<DataFile>>> + delete_files_by_spec; + for (const auto& file : new_delete_files_) { + delete_files_by_spec[file->partition_spec_id.value()].push_back(file); + } + + std::vector<ManifestFile> result; + for (const auto& [spec_id, delete_files] : delete_files_by_spec) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto written, + WriteDeleteManifests(std::span(delete_files), spec)); + for (const auto& m : written) { + all_written_manifests_.insert(m.manifest_path); + } + result.insert(result.end(), std::make_move_iterator(written.begin()), + std::make_move_iterator(written.end())); + } + + cached_new_delete_manifests_ = result; + has_new_delete_files_ = false; + return result; +} + +Result<std::vector<ManifestFile>> MergingSnapshotUpdate::Apply( + const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& snapshot) { + // Re-validate buffered delete files against the current format version. A format + // upgrade between staging and commit could make previously-valid files invalid. + for (const auto& file : new_delete_files_) { + ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file)); + } + + // Rebuild summary from stable sub-builders so that commit retries don't double-count. + summary_builder().Clear(); + summary_builder().Merge(added_data_files_summary_); + summary_builder().Merge(added_delete_files_summary_); + summary_builder().Merge(appended_manifests_summary_); + + auto tracked_factory = MakeTrackedWriterFactory(); + + // Step 1: Filter data manifests. + ICEBERG_ASSIGN_OR_RAISE(auto filtered_data, + data_filter_manager_.FilterManifests( + metadata_to_update, snapshot, tracked_factory)); + + // Track deleted data files in the summary builder. + for (const auto& file : data_filter_manager_.FilesToBeDeleted()) { Review Comment: That makes sense. I'll change the summary accounting to use the files actually deleted in the filtered manifests rather than the requested delete set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
