zhjwpku commented on code in PR #682:
URL: https://github.com/apache/iceberg-cpp/pull/682#discussion_r3318696424


##########
src/iceberg/manifest/manifest_filter_manager.h:
##########
@@ -116,9 +122,36 @@ class ICEBERG_EXPORT ManifestFilterManager {
   /// manifest entry matches a delete condition.
   void FailAnyDelete();
 
+  /// \brief Returns the number of manifests rewritten (replaced) by the last
+  /// FilterManifests() call. A manifest is replaced when it contained deleted 
entries
+  /// and was rewritten with those entries marked DELETED.
+  int32_t ReplacedManifestsCount() const { return replaced_manifests_count_; }
+
   /// \brief Returns true if any delete condition has been registered.
   bool ContainsDeletes() const;
 
+  /// \brief Set the minimum data sequence number for delete files to retain.
+  ///
+  /// Only valid for ManifestContent::kDeletes managers.  Delete entries whose
+  /// data_sequence_number is positive and less than \p sequence_number will be

Review Comment:
   nit: we don't use `\p` :) same for other places.



##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -0,0 +1,1090 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <algorithm>
+#include <span>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/constants.h"
+#include "iceberg/delete_file_index.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_util_internal.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+namespace {
+
+bool MatchesOperation(std::optional<std::string_view> operation,
+                      std::initializer_list<std::string_view> expected) {
+  return operation.has_value() &&
+         std::ranges::find(expected, operation.value()) != expected.end();
+}
+
+struct ValidationHistoryResult {
+  std::vector<ManifestFile> manifests;
+  std::unordered_set<int64_t> snapshot_ids;
+};
+
+struct DeleteFileObjectKey {
+  std::string path;
+  std::optional<int64_t> content_offset;
+  std::optional<int64_t> content_size_in_bytes;
+
+  bool operator==(const DeleteFileObjectKey& other) const = default;
+};
+
+struct DeleteFileObjectKeyHash {
+  size_t operator()(const DeleteFileObjectKey& key) const {
+    size_t hash = std::hash<std::string>{}(key.path);
+    auto combine = [&hash](const auto& value) {
+      size_t value_hash = value.has_value() ? std::hash<int64_t>{}(*value) : 0;
+      hash ^= value_hash + 0x9e3779b9 + (hash << 6) + (hash >> 2);
+    };
+    combine(key.content_offset);
+    combine(key.content_size_in_bytes);
+    return hash;
+  }
+};
+
+DeleteFileObjectKey MakeDeleteFileObjectKey(const DataFile& file) {
+  return DeleteFileObjectKey{.path = file.file_path,
+                             .content_offset = file.content_offset,
+                             .content_size_in_bytes = 
file.content_size_in_bytes};
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> ValidationAncestorsBetween(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      SnapshotUtil::AncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+  if (latest_snapshot_id == starting_snapshot_id) {
+    return ancestors;
+  }
+  if (ancestors.empty()) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+
+  const auto& oldest_checked = ancestors.back();
+  if (oldest_checked == nullptr || 
!oldest_checked->parent_snapshot_id.has_value() ||
+      oldest_checked->parent_snapshot_id.value() != starting_snapshot_id) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+  return ancestors;
+}
+
+Result<ValidationHistoryResult> ValidationHistory(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id,
+    std::initializer_list<std::string_view> matching_operations, 
ManifestContent content,
+    const std::shared_ptr<FileIO>& io) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      ValidationAncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+
+  ValidationHistoryResult result;
+  for (const auto& snapshot : ancestors) {
+    if (!MatchesOperation(snapshot->Operation(), matching_operations)) {
+      continue;
+    }
+
+    result.snapshot_ids.insert(snapshot->snapshot_id);
+    auto cached = SnapshotCache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, content == ManifestContent::kData
+                                                ? cached.DataManifests(io)
+                                                : cached.DeleteManifests(io));
+    for (const auto& manifest : manifests) {
+      if (manifest.added_snapshot_id == snapshot->snapshot_id) {
+        result.manifests.push_back(manifest);
+      }
+    }
+  }
+
+  return result;
+}
+
+Result<std::optional<std::string>> FindMatchingDataFile(
+    const TableMetadata& metadata, const std::vector<ManifestFile>& manifests,
+    ManifestStatus status, std::shared_ptr<Expression> filter,
+    const PartitionSet* partition_set, const std::shared_ptr<FileIO>& io,
+    bool case_sensitive) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  auto partition_filter = partition_set != nullptr
+                              ? std::make_shared<PartitionSet>(*partition_set)
+                              : std::shared_ptr<PartitionSet>{};
+
+  for (const auto& manifest : manifests) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                            
metadata.PartitionSpecById(manifest.partition_spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            ManifestReader::Make(manifest, io, schema, spec));
+    reader->CaseSensitive(case_sensitive);
+    if (filter != nullptr) {
+      reader->FilterRows(filter);
+    }
+    if (partition_filter != nullptr) {
+      reader->FilterPartitions(partition_filter);
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+    for (const auto& entry : entries) {
+      if (entry.status == status && entry.data_file != nullptr) {
+        return entry.data_file->file_path;
+      }
+    }
+  }
+
+  return std::optional<std::string>{};
+}
+
+}  // namespace
+
+MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name,
+                                             
std::shared_ptr<TransactionContext> ctx)
+    : SnapshotUpdate(std::move(ctx)),
+      table_name_(std::move(table_name)),
+      delete_expression_(Expressions::AlwaysFalse()),
+      data_filter_manager_(ManifestContent::kData, ctx_->table->io()),
+      delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()),
+      data_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)),
+      delete_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)) {}
+
+// -------------------------------------------------------------------------
+// Primitive API
+// -------------------------------------------------------------------------
+
+Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null data file");
+  }
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Data file must have a partition spec ID");
+  }
+
+  int32_t spec_id = file->partition_spec_id.value();
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+
+  // Suppress first_row_id — it will be assigned by the commit, not inherited 
from the
+  // source file.
+  file->first_row_id = std::nullopt;
+
+  auto& data_files = new_data_files_by_spec_[spec_id];
+  auto [it, inserted] = data_files.insert(file);
+  if (inserted) {
+    has_new_data_files_ = true;
+    ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, 
*file));
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) {
+  if (file.content == DataFile::Content::kData) {
+    return InvalidArgument("Expected a delete file but got a data file: {}",
+                           file.file_path);
+  }
+  const int8_t format_version = base().format_version;
+  const bool is_dv = ContentFileUtil::IsDV(file);
+  switch (format_version) {
+    case 1:
+      return InvalidArgument("Deletes are supported in V2 and above");
+    case 2:
+      // Position deletes must NOT be DVs in v2.
+      if (file.content == DataFile::Content::kPositionDeletes && is_dv) {
+        return InvalidArgument("Must not use DVs for position deletes in V2: 
{}",
+                               file.file_path);
+      }
+      break;
+    default:
+      if (format_version >= 3) {
+        // Position deletes MUST be DVs in v3+.
+        if (file.content == DataFile::Content::kPositionDeletes && !is_dv) {
+          return InvalidArgument("Must use DVs for position deletes in V{}: 
{}",
+                                 format_version, file.file_path);
+        }
+      } else {
+        return InvalidArgument("Unsupported format version: {}", 
format_version);
+      }
+      break;
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) {
+  return AddDeleteFile(std::move(file), std::nullopt);
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            int64_t data_sequence_number) {
+  return AddDeleteFile(std::move(file), 
std::optional<int64_t>(data_sequence_number));
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            std::optional<int64_t> 
data_sequence_number) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null delete file");
+  }
+  ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file));
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Delete file must have a partition spec ID");
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
base().PartitionSpecById(file->partition_spec_id.value()));
+  (void)spec;

Review Comment:
   Use `ICEBERG_RETURN_UNEXPECTED` if we don't use the returned value?



##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -0,0 +1,1090 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <algorithm>
+#include <span>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/constants.h"
+#include "iceberg/delete_file_index.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_util_internal.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+namespace {
+
+bool MatchesOperation(std::optional<std::string_view> operation,
+                      std::initializer_list<std::string_view> expected) {
+  return operation.has_value() &&
+         std::ranges::find(expected, operation.value()) != expected.end();
+}
+
+struct ValidationHistoryResult {
+  std::vector<ManifestFile> manifests;
+  std::unordered_set<int64_t> snapshot_ids;
+};
+
+struct DeleteFileObjectKey {
+  std::string path;
+  std::optional<int64_t> content_offset;
+  std::optional<int64_t> content_size_in_bytes;
+
+  bool operator==(const DeleteFileObjectKey& other) const = default;
+};
+
+struct DeleteFileObjectKeyHash {
+  size_t operator()(const DeleteFileObjectKey& key) const {
+    size_t hash = std::hash<std::string>{}(key.path);
+    auto combine = [&hash](const auto& value) {
+      size_t value_hash = value.has_value() ? std::hash<int64_t>{}(*value) : 0;
+      hash ^= value_hash + 0x9e3779b9 + (hash << 6) + (hash >> 2);
+    };
+    combine(key.content_offset);
+    combine(key.content_size_in_bytes);
+    return hash;
+  }
+};
+
+DeleteFileObjectKey MakeDeleteFileObjectKey(const DataFile& file) {
+  return DeleteFileObjectKey{.path = file.file_path,
+                             .content_offset = file.content_offset,
+                             .content_size_in_bytes = 
file.content_size_in_bytes};
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> ValidationAncestorsBetween(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      SnapshotUtil::AncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+  if (latest_snapshot_id == starting_snapshot_id) {
+    return ancestors;
+  }
+  if (ancestors.empty()) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+
+  const auto& oldest_checked = ancestors.back();
+  if (oldest_checked == nullptr || 
!oldest_checked->parent_snapshot_id.has_value() ||
+      oldest_checked->parent_snapshot_id.value() != starting_snapshot_id) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+  return ancestors;
+}
+
+Result<ValidationHistoryResult> ValidationHistory(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id,
+    std::initializer_list<std::string_view> matching_operations, 
ManifestContent content,
+    const std::shared_ptr<FileIO>& io) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      ValidationAncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+
+  ValidationHistoryResult result;
+  for (const auto& snapshot : ancestors) {
+    if (!MatchesOperation(snapshot->Operation(), matching_operations)) {
+      continue;
+    }
+
+    result.snapshot_ids.insert(snapshot->snapshot_id);
+    auto cached = SnapshotCache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, content == ManifestContent::kData
+                                                ? cached.DataManifests(io)
+                                                : cached.DeleteManifests(io));
+    for (const auto& manifest : manifests) {
+      if (manifest.added_snapshot_id == snapshot->snapshot_id) {
+        result.manifests.push_back(manifest);
+      }
+    }
+  }
+
+  return result;
+}
+
+Result<std::optional<std::string>> FindMatchingDataFile(
+    const TableMetadata& metadata, const std::vector<ManifestFile>& manifests,
+    ManifestStatus status, std::shared_ptr<Expression> filter,
+    const PartitionSet* partition_set, const std::shared_ptr<FileIO>& io,
+    bool case_sensitive) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  auto partition_filter = partition_set != nullptr
+                              ? std::make_shared<PartitionSet>(*partition_set)
+                              : std::shared_ptr<PartitionSet>{};
+
+  for (const auto& manifest : manifests) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                            
metadata.PartitionSpecById(manifest.partition_spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            ManifestReader::Make(manifest, io, schema, spec));
+    reader->CaseSensitive(case_sensitive);
+    if (filter != nullptr) {
+      reader->FilterRows(filter);
+    }
+    if (partition_filter != nullptr) {
+      reader->FilterPartitions(partition_filter);
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+    for (const auto& entry : entries) {
+      if (entry.status == status && entry.data_file != nullptr) {
+        return entry.data_file->file_path;
+      }
+    }
+  }
+
+  return std::optional<std::string>{};
+}
+
+}  // namespace
+
+MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name,
+                                             
std::shared_ptr<TransactionContext> ctx)
+    : SnapshotUpdate(std::move(ctx)),
+      table_name_(std::move(table_name)),
+      delete_expression_(Expressions::AlwaysFalse()),
+      data_filter_manager_(ManifestContent::kData, ctx_->table->io()),
+      delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()),
+      data_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)),
+      delete_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)) {}
+
+// -------------------------------------------------------------------------
+// Primitive API
+// -------------------------------------------------------------------------
+
+Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null data file");
+  }
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Data file must have a partition spec ID");
+  }
+
+  int32_t spec_id = file->partition_spec_id.value();
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+
+  // Suppress first_row_id — it will be assigned by the commit, not inherited 
from the
+  // source file.
+  file->first_row_id = std::nullopt;
+
+  auto& data_files = new_data_files_by_spec_[spec_id];
+  auto [it, inserted] = data_files.insert(file);
+  if (inserted) {
+    has_new_data_files_ = true;
+    ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, 
*file));
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) {
+  if (file.content == DataFile::Content::kData) {
+    return InvalidArgument("Expected a delete file but got a data file: {}",
+                           file.file_path);
+  }
+  const int8_t format_version = base().format_version;
+  const bool is_dv = ContentFileUtil::IsDV(file);
+  switch (format_version) {
+    case 1:
+      return InvalidArgument("Deletes are supported in V2 and above");
+    case 2:
+      // Position deletes must NOT be DVs in v2.
+      if (file.content == DataFile::Content::kPositionDeletes && is_dv) {
+        return InvalidArgument("Must not use DVs for position deletes in V2: 
{}",
+                               file.file_path);
+      }
+      break;
+    default:
+      if (format_version >= 3) {
+        // Position deletes MUST be DVs in v3+.
+        if (file.content == DataFile::Content::kPositionDeletes && !is_dv) {
+          return InvalidArgument("Must use DVs for position deletes in V{}: 
{}",
+                                 format_version, file.file_path);
+        }
+      } else {
+        return InvalidArgument("Unsupported format version: {}", 
format_version);
+      }
+      break;
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) {
+  return AddDeleteFile(std::move(file), std::nullopt);
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            int64_t data_sequence_number) {
+  return AddDeleteFile(std::move(file), 
std::optional<int64_t>(data_sequence_number));
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            std::optional<int64_t> 
data_sequence_number) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null delete file");
+  }
+  ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file));
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Delete file must have a partition spec ID");
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
base().PartitionSpecById(file->partition_spec_id.value()));
+  (void)spec;
+  has_new_delete_files_ = true;
+  new_delete_files_.push_back(PendingDeleteFile{
+      .file = std::move(file), .data_sequence_number = 
std::move(data_sequence_number)});
+  return {};
+}
+
+Status MergingSnapshotUpdate::DeleteDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot delete a null data file");
+  }
+  return data_filter_manager_.DeleteFile(std::move(file));
+}
+
+Status MergingSnapshotUpdate::DeleteDeleteFile(std::shared_ptr<DataFile> file) 
{
+  if (!file) {
+    return InvalidArgument("Cannot delete a null delete file");
+  }
+  return delete_filter_manager_.DeleteFile(std::move(file));
+}
+
+void MergingSnapshotUpdate::DeleteByPath(std::string_view path) {
+  data_filter_manager_.DeleteFile(path);
+}
+
+Status MergingSnapshotUpdate::DeleteByRowFilter(std::shared_ptr<Expression> 
expr) {
+  // If a delete file matches the row filter, it can also be removed because 
the rows
+  // it references will also be deleted. Both filter managers receive the 
expression.
+  delete_expression_ = expr;
+  ICEBERG_RETURN_UNEXPECTED(data_filter_manager_.DeleteByRowFilter(expr));
+  return delete_filter_manager_.DeleteByRowFilter(std::move(expr));
+}
+
+void MergingSnapshotUpdate::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  // Dropping data in a partition also drops all delete files in that 
partition.
+  data_filter_manager_.DropPartition(spec_id, partition);
+  delete_filter_manager_.DropPartition(spec_id, std::move(partition));
+}
+
+void MergingSnapshotUpdate::FailMissingDeletePaths() {
+  data_filter_manager_.FailMissingDeletePaths();
+  delete_filter_manager_.FailMissingDeletePaths();
+}
+
+void MergingSnapshotUpdate::FailAnyDelete() {
+  data_filter_manager_.FailAnyDelete();
+  delete_filter_manager_.FailAnyDelete();
+}
+
+void MergingSnapshotUpdate::SetNewDataFilesDataSequenceNumber(int64_t 
sequence_number) {
+  new_data_files_data_seq_number_ = sequence_number;
+}
+
+void MergingSnapshotUpdate::CaseSensitive(bool case_sensitive) {
+  case_sensitive_ = case_sensitive;
+  data_filter_manager_.CaseSensitive(case_sensitive);
+  delete_filter_manager_.CaseSensitive(case_sensitive);
+}
+
+void MergingSnapshotUpdate::Set(const std::string& property, const 
std::string& value) {
+  summary_builder().Set(property, value);
+}
+
+Result<std::shared_ptr<PartitionSpec>> MergingSnapshotUpdate::DataSpec() const 
{
+  if (new_data_files_by_spec_.empty()) {
+    return InvalidArgument("DataSpec() called before any data file was added");
+  }
+  if (new_data_files_by_spec_.size() > 1) {
+    return InvalidArgument(
+        "DataSpec() requires exactly one partition spec; got {} different 
specs",
+        new_data_files_by_spec_.size());
+  }
+  return base().PartitionSpecById(new_data_files_by_spec_.begin()->first);
+}
+
+std::vector<std::shared_ptr<DataFile>> MergingSnapshotUpdate::AddedDataFiles() 
const {
+  std::vector<std::shared_ptr<DataFile>> result;
+  for (const auto& [spec_id, files] : new_data_files_by_spec_) {
+    for (const auto& file : files) {
+      result.push_back(file);
+    }
+  }
+  return result;
+}
+
+Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) {
+  if (manifest.content != ManifestContent::kData) {
+    return InvalidArgument("Cannot append delete manifest: {}", 
manifest.manifest_path);
+  }
+  if (can_inherit_snapshot_id() && manifest.added_snapshot_id == 
kInvalidSnapshotId) {
+    if (manifest.first_row_id.has_value()) {
+      return InvalidArgument("Cannot append manifest with assigned first row 
ID: {}",
+                             manifest.manifest_path);
+    }
+    appended_manifests_summary_.AddedManifest(manifest);
+    append_manifests_.push_back(std::move(manifest));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest));
+    rewritten_append_manifests_.push_back(std::move(copied));
+  }
+  return {};
+}
+
+Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& 
manifest) {
+  const TableMetadata& current = base();
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, SnapshotUtil::SchemaFor(current, 
target_branch()));
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
current.PartitionSpecById(manifest.partition_spec_id));
+  std::string path = ManifestPath();
+  all_written_manifests_.insert(path);
+  return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, 
SnapshotId(), path,
+                            current.format_version, 
&appended_manifests_summary_);
+}
+
+// -------------------------------------------------------------------------
+// State queries
+// -------------------------------------------------------------------------
+
+bool MergingSnapshotUpdate::AddsDataFiles() const {
+  return !new_data_files_by_spec_.empty();
+}
+
+bool MergingSnapshotUpdate::AddsDeleteFiles() const { return 
!new_delete_files_.empty(); }
+
+bool MergingSnapshotUpdate::DeletesDataFiles() const {
+  return data_filter_manager_.ContainsDeletes();
+}
+
+bool MergingSnapshotUpdate::DeletesDeleteFiles() const {
+  return delete_filter_manager_.ContainsDeletes();
+}
+
+// -------------------------------------------------------------------------
+// Apply pipeline
+// -------------------------------------------------------------------------
+
+ManifestWriterFactory MergingSnapshotUpdate::MakeTrackedWriterFactory(
+    const std::shared_ptr<Schema>& schema) {
+  return
+      [this, schema](int32_t spec_id,
+                     ManifestContent content) -> 
Result<std::unique_ptr<ManifestWriter>> {
+        const TableMetadata& meta = base();
+        ICEBERG_ASSIGN_OR_RAISE(auto spec, meta.PartitionSpecById(spec_id));
+        std::string path = ManifestPath();
+        all_written_manifests_.insert(path);
+        return ManifestWriter::MakeWriter(meta.format_version, SnapshotId(),
+                                          std::move(path), ctx_->table->io(),
+                                          std::move(spec), schema, content);
+      };
+}
+
+Result<std::vector<ManifestFile>> 
MergingSnapshotUpdate::WriteNewDataManifests() {
+  // If new files were staged after the cache was populated (commit retry), 
invalidate.
+  if (has_new_data_files_ && cached_new_data_manifests_.has_value()) {
+    for (const auto& m : *cached_new_data_manifests_) {
+      std::ignore = DeleteFile(m.manifest_path);
+    }
+    cached_new_data_manifests_.reset();
+  }
+
+  if (cached_new_data_manifests_.has_value()) {
+    return *cached_new_data_manifests_;
+  }
+
+  std::vector<ManifestFile> result;
+  for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto written,
+        WriteDataManifests(data_files.as_span(), spec, 
new_data_files_data_seq_number_));
+    for (const auto& m : written) {
+      all_written_manifests_.insert(m.manifest_path);
+    }
+    result.insert(result.end(), std::make_move_iterator(written.begin()),
+                  std::make_move_iterator(written.end()));
+  }
+
+  cached_new_data_manifests_ = result;
+  has_new_data_files_ = false;
+  return result;
+}
+
+Result<std::vector<MergingSnapshotUpdate::PendingDeleteFile>>
+MergingSnapshotUpdate::NormalizeNewDeleteFiles() const {
+  std::vector<PendingDeleteFile> result;
+  result.reserve(new_delete_files_.size());
+
+  std::unordered_set<DeleteFileObjectKey, DeleteFileObjectKeyHash> 
seen_delete_files;
+  std::unordered_map<std::string, DeleteFileObjectKey> 
dv_by_referenced_data_file;
+
+  for (const auto& pending_file : new_delete_files_) {
+    const auto& file = pending_file.file;
+    ICEBERG_PRECHECK(file != nullptr, "Cannot add a null delete file");
+
+    auto key = MakeDeleteFileObjectKey(*file);
+    if (!seen_delete_files.insert(key).second) {
+      continue;
+    }
+
+    if (ContentFileUtil::IsDV(*file)) {
+      ICEBERG_PRECHECK(file->referenced_data_file.has_value(),
+                       "DV must have a referenced data file: {}", 
file->file_path);
+      auto [it, inserted] =
+          dv_by_referenced_data_file.emplace(*file->referenced_data_file, key);
+      if (!inserted && it->second != key) {
+        return NotImplemented(
+            "Cannot merge multiple deletion vectors for referenced data file: 
{}",
+            *file->referenced_data_file);
+      }
+    }
+
+    result.push_back(pending_file);
+  }
+
+  return result;
+}
+
+Result<std::vector<ManifestFile>> 
MergingSnapshotUpdate::WriteNewDeleteManifests() {
+  // If new files were staged after the cache was populated (commit retry), 
invalidate.
+  if (has_new_delete_files_ && cached_new_delete_manifests_.has_value()) {
+    for (const auto& m : *cached_new_delete_manifests_) {
+      std::ignore = DeleteFile(m.manifest_path);
+    }
+    cached_new_delete_manifests_.reset();
+  }
+
+  if (cached_new_delete_manifests_.has_value()) {
+    return *cached_new_delete_manifests_;
+  }
+
+  // Group delete files by partition spec ID, mirroring 
WriteNewDataManifests().
+  std::unordered_map<int32_t, std::vector<PendingDeleteFile>> 
delete_files_by_spec;
+  for (const auto& pending_file : new_delete_files_) {
+    
delete_files_by_spec[pending_file.file->partition_spec_id.value()].push_back(
+        pending_file);
+  }
+
+  std::vector<ManifestFile> result;
+  for (auto& [spec_id, delete_files] : delete_files_by_spec) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+    std::vector<DeleteManifestEntry> delete_entries;
+    delete_entries.reserve(delete_files.size());
+    for (const auto& pending_file : delete_files) {
+      delete_entries.push_back(DeleteManifestEntry{
+          .file = pending_file.file,
+          .data_sequence_number = pending_file.data_sequence_number,
+      });
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto written, WriteDeleteManifests(delete_entries, 
spec));
+    for (const auto& m : written) {
+      all_written_manifests_.insert(m.manifest_path);
+    }
+    result.insert(result.end(), std::make_move_iterator(written.begin()),
+                  std::make_move_iterator(written.end()));
+  }
+
+  cached_new_delete_manifests_ = result;
+  has_new_delete_files_ = false;
+  return result;
+}
+
+Result<std::vector<ManifestFile>> MergingSnapshotUpdate::Apply(
+    const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& 
snapshot) {
+  // Re-validate buffered delete files against the current format version. A 
format
+  // upgrade between staging and commit could make previously-valid files 
invalid.
+  for (const auto& pending_file : new_delete_files_) {
+    ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*pending_file.file));
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto normalized_delete_files, 
NormalizeNewDeleteFiles());
+  new_delete_files_ = std::move(normalized_delete_files);
+
+  added_delete_files_summary_.Clear();
+  for (const auto& pending_file : new_delete_files_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata_to_update.PartitionSpecById(
+                                           
*pending_file.file->partition_spec_id));
+    ICEBERG_RETURN_UNEXPECTED(
+        added_delete_files_summary_.AddedFile(*spec, *pending_file.file));
+  }
+
+  // Rebuild summary from stable sub-builders so that commit retries don't 
double-count.
+  summary_builder().Clear();

Review Comment:
   Will this clear custom property set by user?



##########
src/iceberg/test/merging_snapshot_update_test.cc:
##########
@@ -0,0 +1,1227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <memory>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/constants.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/transaction.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/update/update_properties.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+/// \brief Concrete subclass of MergingSnapshotUpdate for testing.
+class TestMergeAppend : public MergingSnapshotUpdate {
+ public:
+  static Result<std::unique_ptr<TestMergeAppend>> Make(std::string table_name,
+                                                       std::shared_ptr<Table> 
table) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto ctx, TransactionContext::Make(std::move(table), 
TransactionKind::kUpdate));
+    return std::unique_ptr<TestMergeAppend>(
+        new TestMergeAppend(std::move(table_name), std::move(ctx)));
+  }
+
+  std::string operation() override { return "append"; }
+
+  // Expose protected API for test access
+  Status AddFile(std::shared_ptr<DataFile> file) { return 
AddDataFile(std::move(file)); }
+  Status AddDelete(std::shared_ptr<DataFile> file) {
+    return AddDeleteFile(std::move(file));
+  }
+  Status AddDelete(std::shared_ptr<DataFile> file, int64_t 
data_sequence_number) {
+    return AddDeleteFile(std::move(file), data_sequence_number);
+  }
+  Status RemoveDataFile(std::shared_ptr<DataFile> file) {
+    return DeleteDataFile(std::move(file));
+  }
+  Status RemoveDeleteFile(std::shared_ptr<DataFile> file) {
+    return DeleteDeleteFile(std::move(file));
+  }
+  Status AppendManifest(ManifestFile manifest) {
+    return AddManifest(std::move(manifest));
+  }
+  Result<std::shared_ptr<PartitionSpec>> DataSpec() const {
+    return MergingSnapshotUpdate::DataSpec();
+  }
+  int64_t GeneratedSnapshotId() { return SnapshotId(); }
+  void SetDataSeqNumber(int64_t seq) { SetNewDataFilesDataSequenceNumber(seq); 
}
+  static Status ValidateAddedDataFilesForTest(const TableMetadata& metadata,
+                                              int64_t starting_snapshot_id,
+                                              const std::shared_ptr<Snapshot>& 
parent,
+                                              std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDataFiles(metadata, 
starting_snapshot_id,
+                                                         nullptr, parent, 
std::move(io));
+  }
+  static Status ValidateAddedDataFilesForTest(const TableMetadata& metadata,
+                                              int64_t starting_snapshot_id,
+                                              const PartitionSet& 
partition_set,
+                                              const std::shared_ptr<Snapshot>& 
parent,
+                                              std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDataFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateNoNewDeletesForDataFilesForTest(
+      const TableMetadata& metadata, int64_t starting_snapshot_id,
+      std::shared_ptr<Expression> data_filter, const DataFileSet& 
replaced_files,
+      const std::shared_ptr<Snapshot>& parent, std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), 
replaced_files, parent,
+        std::move(io));
+  }
+  static Status ValidateNoNewDeleteFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                std::shared_ptr<Expression> 
data_filter,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeleteFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), parent, 
std::move(io));
+  }
+  static Status ValidateNoNewDeleteFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                const PartitionSet& 
partition_set,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeleteFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateDeletedDataFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                std::shared_ptr<Expression> 
data_filter,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateDeletedDataFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), parent, 
std::move(io));
+  }
+  static Status ValidateDeletedDataFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                const PartitionSet& 
partition_set,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateDeletedDataFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateAddedDVsForTest(const TableMetadata& metadata,
+                                        int64_t starting_snapshot_id,
+                                        std::shared_ptr<Expression> 
conflict_filter,
+                                        const std::shared_ptr<Snapshot>& 
parent,
+                                        std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDVs(metadata, 
starting_snapshot_id,
+                                                   std::move(conflict_filter), 
parent,
+                                                   std::move(io));
+  }
+
+  bool HasDataFiles() const { return AddsDataFiles(); }
+  bool HasDeleteFiles() const { return AddsDeleteFiles(); }
+  bool HasDataDeletes() const { return DeletesDataFiles(); }
+
+ private:
+  TestMergeAppend(std::string table_name, std::shared_ptr<TransactionContext> 
ctx)
+      : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {}
+};
+
+class TestOverwriteUpdate : public MergingSnapshotUpdate {
+ public:
+  static Result<std::unique_ptr<TestOverwriteUpdate>> Make(std::string 
table_name,
+                                                           
std::shared_ptr<Table> table) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto ctx, TransactionContext::Make(std::move(table), 
TransactionKind::kUpdate));
+    return std::unique_ptr<TestOverwriteUpdate>(
+        new TestOverwriteUpdate(std::move(table_name), std::move(ctx)));
+  }
+
+  std::string operation() override { return DataOperation::kOverwrite; }
+  int64_t GeneratedSnapshotId() { return SnapshotId(); }
+
+  Status AddDelete(std::shared_ptr<DataFile> file) {
+    return AddDeleteFile(std::move(file));
+  }
+  Status AddDelete(std::shared_ptr<DataFile> file, int64_t 
data_sequence_number) {
+    return AddDeleteFile(std::move(file), data_sequence_number);
+  }
+  Status RemoveDataFile(std::shared_ptr<DataFile> file) {
+    return DeleteDataFile(std::move(file));
+  }
+
+ private:
+  TestOverwriteUpdate(std::string table_name, 
std::shared_ptr<TransactionContext> ctx)
+      : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {}
+};
+
+class MergingSnapshotUpdateTest : public MinimalUpdateTestBase {
+ protected:
+  static void SetUpTestSuite() { avro::RegisterAll(); }
+
+  void SetUp() override {
+    MinimalUpdateTestBase::SetUp();
+
+    ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
+    ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());
+
+    file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L);
+    file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L);
+  }
+
+  std::shared_ptr<DataFile> MakeDataFile(const std::string& path, int64_t 
partition_x) {
+    auto f = std::make_shared<DataFile>();
+    f->content = DataFile::Content::kData;
+    f->file_path = table_location_ + path;
+    f->file_format = FileFormatType::kParquet;
+    f->partition = 
PartitionValues(std::vector<Literal>{Literal::Long(partition_x)});
+    f->file_size_in_bytes = 1024;
+    f->record_count = 100;
+    f->partition_spec_id = spec_->spec_id();
+    return f;
+  }
+
+  std::shared_ptr<DataFile> MakeDeleteFile(const std::string& path, int64_t 
partition_x) {
+    auto f = MakeDataFile(path, partition_x);
+    f->content = DataFile::Content::kPositionDeletes;
+    return f;
+  }
+
+  std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
+                                                   int64_t partition_x) {
+    auto f = MakeDeleteFile(path, partition_x);
+    f->content = DataFile::Content::kEqualityDeletes;
+    f->equality_ids = {1};
+    return f;
+  }
+
+  Result<std::unique_ptr<TestMergeAppend>> NewMergeAppend() {
+    return TestMergeAppend::Make(TableName(), table_);
+  }
+
+  Result<std::unique_ptr<TestOverwriteUpdate>> NewOverwriteUpdate() {
+    return TestOverwriteUpdate::Make(TableName(), table_);
+  }
+
+  // Commit file_a_ with FastAppend and refresh the table.
+  void CommitFileA() {
+    ICEBERG_UNWRAP_OR_FAIL(auto fa, table_->NewFastAppend());
+    fa->AppendFile(file_a_);
+    EXPECT_THAT(fa->Commit(), IsOk());
+    EXPECT_THAT(table_->Refresh(), IsOk());
+  }
+
+  // Read all entries from a list of ManifestFiles.
+  Result<std::vector<ManifestEntry>> ReadAllEntries(
+      const std::vector<ManifestFile>& manifests, const TableMetadata& 
metadata) {
+    std::vector<ManifestEntry> result;
+    for (const auto& m : manifests) {
+      ICEBERG_ASSIGN_OR_RAISE(auto spec, 
metadata.PartitionSpecById(m.partition_spec_id));
+      ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+      ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                              ManifestReader::Make(m, file_io_, schema, spec));
+      ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+      result.insert(result.end(), entries.begin(), entries.end());
+    }
+    return result;
+  }
+
+  // Write a manifest file containing the given data files.
+  // Returns a ManifestFile with added_snapshot_id = kInvalidSnapshotId so it
+  // is eligible for snapshot ID inheritance.
+  Result<ManifestFile> WriteManifest(
+      const std::string& path, const std::vector<std::shared_ptr<DataFile>>& 
files) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestWriter::MakeWriter(/*format_version=*/2, kInvalidSnapshotId, 
path,
+                                   file_io_, spec_, schema_, 
ManifestContent::kData));
+    for (const auto& f : files) {
+      ManifestEntry entry;
+      entry.status = ManifestStatus::kAdded;
+      entry.snapshot_id = std::nullopt;
+      entry.data_file = f;
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    return writer->ToManifestFile();
+  }
+
+  Result<ManifestFile> WriteDeleteManifest(
+      const TableMetadata& metadata, const std::string& path,
+      const std::vector<std::shared_ptr<DataFile>>& files, int64_t 
sequence_number) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, 
metadata.PartitionSpecById(spec_->spec_id()));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestWriter::MakeWriter(metadata.format_version, 
/*snapshot_id=*/1L, path,
+                                   file_io_, spec, schema, 
ManifestContent::kDeletes));
+    for (const auto& f : files) {
+      ManifestEntry entry;
+      entry.status = ManifestStatus::kAdded;
+      entry.snapshot_id = 1L;
+      entry.sequence_number = sequence_number;
+      entry.data_file = f;
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    return writer->ToManifestFile();
+  }
+
+  Result<std::shared_ptr<Snapshot>> MakeSyntheticSnapshot(
+      std::string operation, int64_t snapshot_id,
+      std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
+      const std::vector<ManifestFile>& manifests) {
+    auto manifest_list_path = table_location_ + "/metadata/manifest-list-" +
+                              std::to_string(snapshot_id) + ".avro";
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestListWriter::MakeWriter(table_->metadata()->format_version, 
snapshot_id,
+                                       parent_snapshot_id, manifest_list_path, 
file_io_,
+                                       sequence_number));
+    ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto snapshot,
+        Snapshot::Make(sequence_number, snapshot_id, parent_snapshot_id, 
TimePointMs{},
+                       std::move(operation), {}, 
table_->metadata()->current_schema_id,
+                       manifest_list_path));
+    return std::shared_ptr<Snapshot>(std::move(snapshot));
+  }
+
+  std::shared_ptr<PartitionSpec> spec_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<DataFile> file_a_;
+  std::shared_ptr<DataFile> file_b_;
+};
+
+// -------------------------------------------------------------------------
+// State query tests
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, AddsDataFilesInitiallyFalse) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_FALSE(op->HasDataFiles());
+  EXPECT_FALSE(op->HasDeleteFiles());
+  EXPECT_FALSE(op->HasDataDeletes());
+}
+
+TEST_F(MergingSnapshotUpdateTest, AddsDataFilesTrueAfterAdd) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_TRUE(op->HasDataFiles());
+  EXPECT_FALSE(op->HasDeleteFiles());
+}
+
+TEST_F(MergingSnapshotUpdateTest, AddsDeleteFilesTrueAfterAdd) {
+  auto del_file = MakeEqualityDeleteFile("/delete/del_a.parquet", 1L);
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddDelete(del_file), IsOk());
+  EXPECT_FALSE(op->HasDataFiles());
+  EXPECT_TRUE(op->HasDeleteFiles());
+}
+
+TEST_F(MergingSnapshotUpdateTest, DeletesDataFilesTrueAfterRegisterDelete) {
+  CommitFileA();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->RemoveDataFile(file_a_), IsOk());
+  EXPECT_TRUE(op->HasDataDeletes());
+}
+
+// -------------------------------------------------------------------------
+// Apply / Commit tests
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, CommitNewDataFile) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");

Review Comment:
   nit: use the metrics const string listed in snapshot.h? not a strong opinion 
but it seems you are using them later in this test file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to