shangxinli commented on code in PR #648:
URL: https://github.com/apache/iceberg-cpp/pull/648#discussion_r3266810698


##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -331,6 +332,309 @@ class ReachableFileCleanup : public FileCleanupStrategy {
   }
 };
 
+/// \brief Incremental file cleanup strategy for simple linear-ancestry 
expirations.
+///
+/// Mirrors Java's IncrementalFileCleanup. Only safe when:
+///   * No snapshot IDs were explicitly listed for expiration.
+///   * No removed snapshots lived outside the current main ancestry.
+///   * No retained snapshots live outside the current main ancestry.
+///
+/// Each manifest is attributed to its writer snapshot via added_snapshot_id, 
so
+/// two snapshot passes are enough -- one over retained snapshots to learn 
which
+/// manifests are still live, one over expired snapshots to learn which 
manifests,
+/// manifest lists, and data files to drop. Cherry-pick protection via
+/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was
+/// logically introduced by a snapshot whose changes are still present in the
+/// current state under a different id.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion 
support.
+class IncrementalFileCleanup : public FileCleanupStrategy {
+ public:
+  using FileCleanupStrategy::FileCleanupStrategy;
+
+  Status CleanFiles(const TableMetadata& metadata_before_expiration,
+                    const TableMetadata& metadata_after_expiration,
+                    const std::unordered_set<int64_t>& expired_snapshot_ids,

Review Comment:
   Done in b57207a — both strategies now derive expired IDs from before/after 
metadata.



##########
src/iceberg/test/expire_snapshots_test.cc:
##########
@@ -573,4 +580,75 @@ TEST_F(ExpireSnapshotsCleanupTest, 
KeepsReusedPartitionStats) {
   EXPECT_THAT(deleted_files, 
testing::Not(testing::Contains(reused_statistics_path)));
 }
 
+// Linear-ancestry, no specified ID: dispatch must pick IncrementalFileCleanup.

Review Comment:
   Trimmed in b57207a.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -613,11 +917,23 @@ Status ExpireSnapshots::Finalize(Result<const 
TableMetadata*> commit_result) {
   apply_result_.reset();
 
   // File cleanup is best-effort: log and continue on individual file deletion 
failures
-  ReachableFileCleanup strategy(ctx_->table->io(), delete_func_);
-  return strategy.CleanFiles(metadata_before_expiration, 
metadata_after_expiration,
-                             expired_ids, cleanup_level_);
+  // Pick incremental cleanup when the expiration is a simple linear-ancestry 
walk:
+  // no explicit snapshot IDs, no removed snapshots outside main ancestry, and 
no
+  // retained snapshots outside main ancestry. Mirrors Java RemoveSnapshots's
+  // dispatch in cleanExpiredSnapshots().
+  bool can_use_incremental = !specified_snapshot_id_ &&
+                             
!HasRemovedNonMainAncestors(metadata_before_expiration,
+                                                         
metadata_after_expiration) &&
+                             !HasNonMainSnapshots(metadata_after_expiration);
+
+  std::unique_ptr<FileCleanupStrategy> strategy;
+  if (can_use_incremental) {
+    strategy = std::make_unique<IncrementalFileCleanup>(ctx_->table->io(), 
delete_func_);
+  } else {
+    strategy = std::make_unique<ReachableFileCleanup>(ctx_->table->io(), 
delete_func_);
+  }
+  return strategy->CleanFiles(metadata_before_expiration, 
metadata_after_expiration,

Review Comment:
   Done in b57207a — PendingUpdate::Commit() now returns Finalize's status. 
Added CommitPropagatesMalformedSourceSnapshotId.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -331,6 +332,309 @@ class ReachableFileCleanup : public FileCleanupStrategy {
   }
 };
 
+/// \brief Incremental file cleanup strategy for simple linear-ancestry 
expirations.
+///
+/// Mirrors Java's IncrementalFileCleanup. Only safe when:

Review Comment:
   Dropped in b57207a.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -331,6 +331,303 @@ class ReachableFileCleanup : public FileCleanupStrategy {
   }
 };
 
+/// \brief Incremental file cleanup strategy for simple linear-ancestry 
expirations.
+///
+/// Mirrors Java's IncrementalFileCleanup. Only safe when:
+///   * No snapshot IDs were explicitly listed for expiration.
+///   * No removed snapshots lived outside the current main ancestry.
+///   * No retained snapshots live outside the current main ancestry.
+///
+/// Each manifest is attributed to its writer snapshot via added_snapshot_id, 
so
+/// two snapshot passes are enough -- one over retained snapshots to learn 
which
+/// manifests are still live, one over expired snapshots to learn which 
manifests,
+/// manifest lists, and data files to drop. Cherry-pick protection via
+/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was
+/// logically introduced by a snapshot whose changes are still present in the
+/// current state under a different id.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion 
support.
+class IncrementalFileCleanup : public FileCleanupStrategy {
+ public:
+  using FileCleanupStrategy::FileCleanupStrategy;
+
+  Status CleanFiles(const TableMetadata& metadata_before_expiration,
+                    const TableMetadata& metadata_after_expiration,
+                    const std::unordered_set<int64_t>& expired_snapshot_ids,
+                    CleanupLevel level) override {
+    if (expired_snapshot_ids.empty()) {
+      return {};
+    }
+
+    std::unordered_set<int64_t> valid_ids;
+    valid_ids.reserve(metadata_after_expiration.snapshots.size());
+    for (const auto& snapshot : metadata_after_expiration.snapshots) {
+      if (snapshot) {
+        valid_ids.insert(snapshot->snapshot_id);
+      }
+    }
+
+    auto current_result = metadata_before_expiration.SnapshotById(
+        metadata_before_expiration.current_snapshot_id);
+    if (!current_result.has_value() || current_result.value() == nullptr) {
+      return {};
+    }
+
+    // Ancestors of the current table state. Files deleted in a non-ancestor
+    // snapshot may still belong to the current state (rolled-back commits),
+    // so we only physically delete files removed by ancestor snapshots.
+    auto ancestors_result = SnapshotUtil::AncestorsOf(
+        current_result.value()->snapshot_id, 
[&metadata_before_expiration](int64_t id) {
+          return metadata_before_expiration.SnapshotById(id);
+        });
+    if (!ancestors_result.has_value()) {
+      return {};
+    }
+    std::unordered_set<int64_t> ancestor_ids;
+    ancestor_ids.reserve(ancestors_result.value().size());
+    for (const auto& ancestor : ancestors_result.value()) {
+      if (ancestor) ancestor_ids.insert(ancestor->snapshot_id);
+    }
+
+    // Cherry-pick protection: snapshots whose changes were picked into the
+    // current ancestry under a different snapshot id should not be cleaned up.
+    // Iterate the ancestor pointers we already have rather than re-looking-up
+    // each snapshot by id.
+    std::unordered_set<int64_t> picked_ancestor_snapshot_ids;
+    for (const auto& ancestor : ancestors_result.value()) {
+      if (!ancestor) continue;
+      const auto& summary = ancestor->summary;
+      auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId);
+      if (it == summary.end()) continue;
+      try {
+        picked_ancestor_snapshot_ids.insert(std::stoll(it->second));
+      } catch (...) {
+        // Malformed source-snapshot-id; skip rather than fail cleanup.

Review Comment:
   Fixed in b57207a — both parse sites now propagate via 
ICEBERG_ASSIGN_OR_RAISE.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -613,11 +917,23 @@ Status ExpireSnapshots::Finalize(Result<const 
TableMetadata*> commit_result) {
   apply_result_.reset();
 
   // File cleanup is best-effort: log and continue on individual file deletion 
failures
-  ReachableFileCleanup strategy(ctx_->table->io(), delete_func_);
-  return strategy.CleanFiles(metadata_before_expiration, 
metadata_after_expiration,
-                             expired_ids, cleanup_level_);
+  // Pick incremental cleanup when the expiration is a simple linear-ancestry 
walk:
+  // no explicit snapshot IDs, no removed snapshots outside main ancestry, and 
no
+  // retained snapshots outside main ancestry. Mirrors Java RemoveSnapshots's
+  // dispatch in cleanExpiredSnapshots().
+  bool can_use_incremental = !specified_snapshot_id_ &&
+                             
!HasRemovedNonMainAncestors(metadata_before_expiration,
+                                                         
metadata_after_expiration) &&
+                             !HasNonMainSnapshots(metadata_after_expiration);
+
+  std::unique_ptr<FileCleanupStrategy> strategy;

Review Comment:
   Done in b57207a.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -331,6 +332,309 @@ class ReachableFileCleanup : public FileCleanupStrategy {
   }
 };
 
+/// \brief Incremental file cleanup strategy for simple linear-ancestry 
expirations.
+///
+/// Mirrors Java's IncrementalFileCleanup. Only safe when:
+///   * No snapshot IDs were explicitly listed for expiration.
+///   * No removed snapshots lived outside the current main ancestry.
+///   * No retained snapshots live outside the current main ancestry.
+///
+/// Each manifest is attributed to its writer snapshot via added_snapshot_id, 
so
+/// two snapshot passes are enough -- one over retained snapshots to learn 
which
+/// manifests are still live, one over expired snapshots to learn which 
manifests,
+/// manifest lists, and data files to drop. Cherry-pick protection via
+/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was
+/// logically introduced by a snapshot whose changes are still present in the
+/// current state under a different id.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion 
support.
+class IncrementalFileCleanup : public FileCleanupStrategy {
+ public:
+  using FileCleanupStrategy::FileCleanupStrategy;
+
+  Status CleanFiles(const TableMetadata& metadata_before_expiration,
+                    const TableMetadata& metadata_after_expiration,
+                    const std::unordered_set<int64_t>& expired_snapshot_ids,
+                    CleanupLevel level) override {
+    if (expired_snapshot_ids.empty()) {
+      return {};
+    }
+
+    std::unordered_set<int64_t> valid_ids;
+    valid_ids.reserve(metadata_after_expiration.snapshots.size());
+    for (const auto& snapshot : metadata_after_expiration.snapshots) {
+      if (snapshot) {
+        valid_ids.insert(snapshot->snapshot_id);
+      }
+    }
+
+    auto current_result = metadata_before_expiration.SnapshotById(
+        metadata_before_expiration.current_snapshot_id);
+    if (!current_result.has_value() || current_result.value() == nullptr) {
+      return {};
+    }
+
+    // Ancestors of the current table state. Files deleted in a non-ancestor
+    // snapshot may still belong to the current state (rolled-back commits),
+    // so we only physically delete files removed by ancestor snapshots.
+    auto ancestors_result = SnapshotUtil::AncestorsOf(
+        current_result.value()->snapshot_id, 
[&metadata_before_expiration](int64_t id) {
+          return metadata_before_expiration.SnapshotById(id);
+        });
+    if (!ancestors_result.has_value()) {
+      return {};
+    }
+    std::unordered_set<int64_t> ancestor_ids;
+    ancestor_ids.reserve(ancestors_result.value().size());
+    for (const auto& ancestor : ancestors_result.value()) {
+      if (ancestor) ancestor_ids.insert(ancestor->snapshot_id);
+    }
+
+    // Cherry-pick protection: snapshots whose changes were picked into the
+    // current ancestry under a different snapshot id should not be cleaned up.
+    // Iterate the ancestor pointers we already have rather than re-looking-up
+    // each snapshot by id.
+    std::unordered_set<int64_t> picked_ancestor_snapshot_ids;
+    for (const auto& ancestor : ancestors_result.value()) {
+      if (!ancestor) continue;
+      const auto& summary = ancestor->summary;
+      auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId);
+      if (it == summary.end()) continue;
+      try {
+        picked_ancestor_snapshot_ids.insert(std::stoll(it->second));

Review Comment:
   Done in b57207a.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to