wgtmac commented on code in PR #592:
URL: https://github.com/apache/iceberg-cpp/pull/592#discussion_r3020581589


##########
src/iceberg/update/expire_snapshots.h:
##########
@@ -22,10 +22,14 @@
 #include <cstdint>
 #include <functional>
 #include <memory>
+#include <string>
+#include <unordered_map>
 #include <unordered_set>
 #include <vector>
 
 #include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"

Review Comment:
   Can we remove these includes by using forward declaration? 
`iceberg/type_fwd.h` has already been included.



##########
src/iceberg/update/expire_snapshots.h:
##########
@@ -169,6 +224,14 @@ class ICEBERG_EXPORT ExpireSnapshots : public 
PendingUpdate {
   enum CleanupLevel cleanup_level_ { CleanupLevel::kAll };
   bool clean_expired_metadata_{false};
   bool specified_snapshot_id_{false};
+
+  /// Cached result from Apply(), used during Finalize() for file cleanup
+  std::optional<ApplyResult> apply_result_;
+
+  /// Cache of manifest path -> ManifestFile, built during 
ReadManifestsForSnapshot
+  /// to avoid O(M*S) repeated I/O from re-reading manifest lists in
+  /// FindDataFilesToDelete.
+  std::unordered_map<std::string, ManifestFile> manifest_cache_;

Review Comment:
   This is the internal and ephemeral state used when cleaning up files. Can we 
remove it from here and move to the source file (or concrete 
FileCleanupStrategy class)?



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,250 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;

Review Comment:
   ```suggestion
         std::ignore = ctx_->table->io()->DeleteFile(path);
   ```



##########
src/iceberg/update/expire_snapshots.h:
##########
@@ -159,6 +173,47 @@ class ICEBERG_EXPORT ExpireSnapshots : public 
PendingUpdate {
   Result<std::unordered_set<int64_t>> UnreferencedSnapshotIdsToRetain(
       const SnapshotToRef& refs) const;
 
+  /// \brief Clean up files no longer referenced after snapshot expiration.
+  ///
+  /// Implements the "reachable file cleanup" strategy from Java's 
ReachableFileCleanup:
+  /// 1. Collect manifests from expired and retained snapshots
+  /// 2. Prune manifests still referenced by retained snapshots
+  /// 3. Find data files only in manifests being deleted (if kAll)
+  /// 4. Remove data files still reachable from retained manifests
+  /// 5. Delete orphaned manifests, manifest lists, and statistics files
+  ///
+  /// All deletions are best-effort: failures are suppressed to avoid blocking
+  /// metadata updates (matching Java's suppressFailureWhenFinished behavior).
+  ///
+  /// Branch/tag awareness: retained_snapshot_ids includes all snapshots 
referenced
+  /// by any branch or tag, as computed by Apply(). This prevents deleting 
files
+  /// that are still reachable from any ref.
+  ///
+  /// TODO(shangxinli): Add multi-threaded file deletion support.
+  /// TODO(shangxinli): Add IncrementalFileCleanup strategy for linear 
ancestry.
+  Status CleanExpiredFiles(const std::vector<int64_t>& expired_snapshot_ids);
+
+  /// \brief Read manifest paths from a single snapshot.
+  /// Best-effort: returns OK even if the snapshot or its manifests can't be 
read.
+  Status ReadManifestsForSnapshot(int64_t snapshot_id,
+                                  std::unordered_set<std::string>& 
manifest_paths);
+
+  /// \brief Find data files to delete by reading live entries from manifests 
being
+  /// deleted, then subtracting files still reachable from retained manifests.
+  /// If a retained manifest cannot be read, returns an empty set to prevent
+  /// accidental data loss.
+  Result<std::unordered_set<std::string>> FindDataFilesToDelete(

Review Comment:
   This function seems to be related to ReachableFileCleanup only.



##########
src/iceberg/update/expire_snapshots.h:
##########
@@ -159,6 +173,47 @@ class ICEBERG_EXPORT ExpireSnapshots : public 
PendingUpdate {
   Result<std::unordered_set<int64_t>> UnreferencedSnapshotIdsToRetain(
       const SnapshotToRef& refs) const;
 
+  /// \brief Clean up files no longer referenced after snapshot expiration.
+  ///
+  /// Implements the "reachable file cleanup" strategy from Java's 
ReachableFileCleanup:
+  /// 1. Collect manifests from expired and retained snapshots
+  /// 2. Prune manifests still referenced by retained snapshots
+  /// 3. Find data files only in manifests being deleted (if kAll)
+  /// 4. Remove data files still reachable from retained manifests
+  /// 5. Delete orphaned manifests, manifest lists, and statistics files
+  ///
+  /// All deletions are best-effort: failures are suppressed to avoid blocking
+  /// metadata updates (matching Java's suppressFailureWhenFinished behavior).
+  ///
+  /// Branch/tag awareness: retained_snapshot_ids includes all snapshots 
referenced
+  /// by any branch or tag, as computed by Apply(). This prevents deleting 
files
+  /// that are still reachable from any ref.
+  ///
+  /// TODO(shangxinli): Add multi-threaded file deletion support.
+  /// TODO(shangxinli): Add IncrementalFileCleanup strategy for linear 
ancestry.
+  Status CleanExpiredFiles(const std::vector<int64_t>& expired_snapshot_ids);
+
+  /// \brief Read manifest paths from a single snapshot.
+  /// Best-effort: returns OK even if the snapshot or its manifests can't be 
read.
+  Status ReadManifestsForSnapshot(int64_t snapshot_id,
+                                  std::unordered_set<std::string>& 
manifest_paths);
+
+  /// \brief Find data files to delete by reading live entries from manifests 
being
+  /// deleted, then subtracting files still reachable from retained manifests.
+  /// If a retained manifest cannot be read, returns an empty set to prevent
+  /// accidental data loss.
+  Result<std::unordered_set<std::string>> FindDataFilesToDelete(
+      const std::unordered_set<std::string>& manifests_to_delete,
+      const std::unordered_set<std::string>& retained_manifests);
+
+  /// \brief Create a ManifestReader for the given ManifestFile.
+  Result<std::shared_ptr<ManifestReader>> MakeManifestReader(

Review Comment:
   This function doesn't justify to be a member function.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,250 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;
+    }
+  } catch (...) {
+    // Suppress all exceptions during file cleanup to match Java's
+    // suppressFailureWhenFinished behavior.
+  }
+}
+
+Result<std::shared_ptr<ManifestReader>> ExpireSnapshots::MakeManifestReader(

Review Comment:
   Make it a local function in the anonymous namespace?



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,250 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;
+    }
+  } catch (...) {
+    // Suppress all exceptions during file cleanup to match Java's
+    // suppressFailureWhenFinished behavior.
+  }
+}
+
+Result<std::shared_ptr<ManifestReader>> ExpireSnapshots::MakeManifestReader(
+    const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io) {
+  const TableMetadata& metadata = base();
+  auto schema_result = metadata.Schema();
+  if (!schema_result.has_value()) return 
std::unexpected<Error>(schema_result.error());
+  auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
+  if (!spec_result.has_value()) return 
std::unexpected<Error>(spec_result.error());
+  return ManifestReader::Make(manifest, file_io, schema_result.value(),
+                              spec_result.value());
+}
+
+Status ExpireSnapshots::ReadManifestsForSnapshot(
+    int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  auto snapshot_result = metadata.SnapshotById(snapshot_id);
+  if (!snapshot_result.has_value()) {
+    return {};
+  }
+  auto& snapshot = snapshot_result.value();
+
+  SnapshotCache snapshot_cache(snapshot.get());
+  auto manifests_result = snapshot_cache.Manifests(file_io);
+  if (!manifests_result.has_value()) {
+    // Best-effort: skip this snapshot if we can't read its manifests
+    return {};
+  }
+
+  for (const auto& manifest : manifests_result.value()) {
+    manifest_paths.insert(manifest.manifest_path);
+    // Cache manifest metadata for later use in FindDataFilesToDelete,
+    // avoiding O(M*S) repeated I/O from re-reading manifest lists.
+    manifest_cache_.emplace(manifest.manifest_path, manifest);
+  }
+
+  return {};
+}
+
+Result<std::unordered_set<std::string>> ExpireSnapshots::FindDataFilesToDelete(
+    const std::unordered_set<std::string>& manifests_to_delete,
+    const std::unordered_set<std::string>& retained_manifests) {
+  auto file_io = ctx_->table->io();
+  std::unordered_set<std::string> data_files_to_delete;
+
+  // Step 1: Collect live file paths from manifests being deleted.
+  // Use LiveEntries() (ADDED/EXISTING only) to match Java's 
ManifestFiles.readPaths
+  // which delegates to liveEntries(). Using Entries() would include DELETED 
entries
+  // and could cause storage leaks.
+  for (const auto& [path, manifest] : manifest_cache_) {
+    if (!manifests_to_delete.contains(path)) continue;
+
+    auto reader_result = MakeManifestReader(manifest, file_io);
+    if (!reader_result.has_value()) continue;
+
+    auto entries_result = reader_result.value()->LiveEntries();
+    if (!entries_result.has_value()) continue;
+
+    for (const auto& entry : entries_result.value()) {
+      if (entry.data_file) {
+        data_files_to_delete.insert(entry.data_file->file_path);
+      }
+    }
+  }
+
+  if (data_files_to_delete.empty()) {
+    return data_files_to_delete;
+  }
+
+  // Step 2: Remove any files that are still referenced by retained manifests.
+  // If reading a retained manifest fails, we must NOT delete its data files
+  // to avoid accidental data loss (matching Java's retry + 
throwFailureWhenFinished).
+  for (const auto& manifest_path : retained_manifests) {
+    if (data_files_to_delete.empty()) break;
+
+    auto it = manifest_cache_.find(manifest_path);
+    if (it == manifest_cache_.end()) continue;
+
+    auto reader_result = MakeManifestReader(it->second, file_io);
+    if (!reader_result.has_value()) {
+      // Cannot read a retained manifest — abort data file deletion to prevent
+      // accidental data loss. Java retries and throws on failure here.
+      return std::unordered_set<std::string>{};
+    }
+
+    auto entries_result = reader_result.value()->LiveEntries();
+    if (!entries_result.has_value()) {
+      return std::unordered_set<std::string>{};
+    }
+
+    for (const auto& entry : entries_result.value()) {
+      if (entry.data_file) {
+        data_files_to_delete.erase(entry.data_file->file_path);
+      }
+    }
+  }
+
+  return data_files_to_delete;
+}
+
+Status ExpireSnapshots::CleanExpiredFiles(
+    const std::vector<int64_t>& expired_snapshot_ids) {
+  const TableMetadata& metadata = base();
+
+  // Build expired and retained snapshot ID sets.
+  // The retained set includes ALL snapshots referenced by any branch or tag,
+  // since Apply() already computed retention across all refs.
+  std::unordered_set<int64_t> expired_id_set(expired_snapshot_ids.begin(),
+                                             expired_snapshot_ids.end());
+  std::unordered_set<int64_t> retained_snapshot_ids;
+  for (const auto& snapshot : metadata.snapshots) {
+    if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) {
+      retained_snapshot_ids.insert(snapshot->snapshot_id);
+    }
+  }
+
+  // Phase 1: Collect manifest paths from expired and retained snapshots.
+  // TODO(shangxinli): Parallelize manifest collection with a thread pool.
+  std::unordered_set<std::string> expired_manifest_paths;
+  for (int64_t snapshot_id : expired_snapshot_ids) {
+    std::ignore = ReadManifestsForSnapshot(snapshot_id, 
expired_manifest_paths);

Review Comment:
   Why we can ignore the error? Isn't it safe to abort entire operation?



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,250 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;
+    }
+  } catch (...) {
+    // Suppress all exceptions during file cleanup to match Java's
+    // suppressFailureWhenFinished behavior.
+  }
+}
+
+Result<std::shared_ptr<ManifestReader>> ExpireSnapshots::MakeManifestReader(
+    const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io) {
+  const TableMetadata& metadata = base();
+  auto schema_result = metadata.Schema();
+  if (!schema_result.has_value()) return 
std::unexpected<Error>(schema_result.error());
+  auto spec_result = metadata.PartitionSpecById(manifest.partition_spec_id);
+  if (!spec_result.has_value()) return 
std::unexpected<Error>(spec_result.error());
+  return ManifestReader::Make(manifest, file_io, schema_result.value(),
+                              spec_result.value());

Review Comment:
   ```suggestion
     ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
     ICEBERG_ASSIGN_OR_RAISE(auto spec, 
metadata.PartitionSpecById(manifest.partition_spec_id));
     return ManifestReader::Make(manifest, file_io, std::move(schema), 
std::move(spec));
   ```



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,250 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);

Review Comment:
   Should we clear `apply_result_` if `CleanExpiredFiles` succeeds?



##########
src/iceberg/update/expire_snapshots.h:
##########
@@ -159,6 +173,47 @@ class ICEBERG_EXPORT ExpireSnapshots : public 
PendingUpdate {
   Result<std::unordered_set<int64_t>> UnreferencedSnapshotIdsToRetain(
       const SnapshotToRef& refs) const;
 
+  /// \brief Clean up files no longer referenced after snapshot expiration.
+  ///
+  /// Implements the "reachable file cleanup" strategy from Java's 
ReachableFileCleanup:
+  /// 1. Collect manifests from expired and retained snapshots
+  /// 2. Prune manifests still referenced by retained snapshots
+  /// 3. Find data files only in manifests being deleted (if kAll)
+  /// 4. Remove data files still reachable from retained manifests
+  /// 5. Delete orphaned manifests, manifest lists, and statistics files
+  ///
+  /// All deletions are best-effort: failures are suppressed to avoid blocking
+  /// metadata updates (matching Java's suppressFailureWhenFinished behavior).
+  ///
+  /// Branch/tag awareness: retained_snapshot_ids includes all snapshots 
referenced
+  /// by any branch or tag, as computed by Apply(). This prevents deleting 
files
+  /// that are still reachable from any ref.
+  ///
+  /// TODO(shangxinli): Add multi-threaded file deletion support.
+  /// TODO(shangxinli): Add IncrementalFileCleanup strategy for linear 
ancestry.
+  Status CleanExpiredFiles(const std::vector<int64_t>& expired_snapshot_ids);
+
+  /// \brief Read manifest paths from a single snapshot.
+  /// Best-effort: returns OK even if the snapshot or its manifests can't be 
read.
+  Status ReadManifestsForSnapshot(int64_t snapshot_id,

Review Comment:
   It seems worth adding a similar abstraction like the Java 
`FileCleanupStrategy` if we will add `IncrementalFileCleanup` or more 
strategies in the future. We can add them only in the expire_snapshots.cc for 
now as it is unlikely to be used elsewhere.
   
   I'm also thinking if it is valuable to enable users to customize any 
strategy by registering their own implementations but that's a separate topic.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,250 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {

Review Comment:
   ```suggestion
   void ExpireSnapshots::DeleteFile(const std::string& path) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to