CTTY commented on code in PR #1455: URL: https://github.com/apache/iceberg-rust/pull/1455#discussion_r2157765709
########## crates/iceberg/src/maintenance/expire_snapshots.rs: ########## @@ -0,0 +1,1144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Expire snapshots maintenance operation +//! +//! This module implements the expire snapshots operation that removes old snapshots +//! and their associated metadata files from the table while keeping the table +//! in a consistent state. + +use std::collections::HashSet; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::runtime::JoinHandle; +use crate::spec::SnapshotRef; +use crate::table::Table; +use crate::transaction::Transaction; +use crate::{Catalog, Error, ErrorKind, TableUpdate}; + +/// Result of the expire snapshots operation. Contains information about how many files were +/// deleted. +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ExpireSnapshotsResult { + /// Number of data files deleted. Data file deletion is not supported by this action yet, this + /// will always be 0. + pub deleted_data_files_count: u64, + /// Number of position delete files deleted. Position delete file deletion is not supported by + /// this action yet, this will always be 0. + pub deleted_position_delete_files_count: u64, + /// Number of equality delete files deleted. Equality delete file deletion is not supported by + /// this action yet, this will always be 0. + pub deleted_equality_delete_files_count: u64, + /// Number of manifest files deleted + pub deleted_manifest_files_count: u64, + /// Number of manifest list files deleted + pub deleted_manifest_lists_count: u64, + /// Number of statistics files deleted. Statistics file deletion is not supported by this action + /// yet, this will always be 0. + pub deleted_statistics_files_count: u64, +} + +/// Configuration for the expire snapshots operation +#[derive(Debug, Clone)] +pub struct ExpireSnapshotsConfig { + /// Timestamp in milliseconds. Snapshots older than this will be expired + pub older_than_ms: Option<i64>, + /// Minimum number of snapshots to retain + pub retain_last: Option<u32>, + /// Maximum number of concurrent file deletions + pub max_concurrent_deletes: Option<u32>, + /// Specific snapshot IDs to expire + pub snapshot_ids: Vec<i64>, + /// Whether to perform a dry run. If true, the operation will not delete any files, but will + /// still identify the files to delete and return the result. + pub dry_run: bool, +} + +impl Default for ExpireSnapshotsConfig { + fn default() -> Self { + Self { + older_than_ms: None, + retain_last: Some(1), // Default to retaining at least 1 snapshot + max_concurrent_deletes: None, + snapshot_ids: vec![], + dry_run: false, + } + } +} + +/// Trait for performing expire snapshots operations +/// +/// This trait provides a low-level API for expiring snapshots that can be +/// extended with different implementations for different environments. +#[async_trait] +pub trait ExpireSnapshots: Send + Sync { + /// Execute the expire snapshots operation + async fn execute(&self, catalog: &dyn Catalog) -> Result<ExpireSnapshotsResult>; +} + +/// Implementation of the expire snapshots operation +pub struct ExpireSnapshotsAction { + table: Table, + config: ExpireSnapshotsConfig, +} + +impl ExpireSnapshotsAction { + /// Create a new expire snapshots action + pub fn new(table: Table) -> Self { + Self { + table, + config: ExpireSnapshotsConfig::default(), + } + } + + /// Set the timestamp threshold for expiring snapshots + pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self { + self.config.older_than_ms = Some(timestamp_ms); + self + } + + /// Set the dry run flag + pub fn dry_run(mut self, dry_run: bool) -> Self { + self.config.dry_run = dry_run; + self + } + + /// Set the minimum number of snapshots to retain. If the number of snapshots is less than 1, + /// it will be automatically adjusted to 1, following the behavior in Spark. + pub fn retain_last(mut self, num_snapshots: u32) -> Self { + if num_snapshots < 1 { + self.config.retain_last = Some(1); + } else { + self.config.retain_last = Some(num_snapshots); + } + self + } + + /// Set specific snapshot IDs to expire. An empty list is equivalent to the default behavior + /// of expiring all but `retain_last` snapshots! When only expiring specific snapshots, please + /// ensure that the list of snapshot IDs is non-empty before using this method. + pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec<i64>) -> Self { + self.config.snapshot_ids = snapshot_ids; + self + } + + /// Set the maximum number of concurrent file deletions + pub fn max_concurrent_deletes(mut self, max_deletes: u32) -> Self { + if max_deletes > 0 { + self.config.max_concurrent_deletes = Some(max_deletes); + } + self + } + + /// Determine which snapshots should be expired based on the configuration. This will: + /// + /// - Sort snapshots by timestamp (oldest first) + /// - Apply filters if supplied. If multiple filters are supplied, the result will be the + /// intersection of the results of each filter. + /// - If specific snapshot IDs are provided, only expire those + /// - If `older_than_ms` is provided, expire snapshots older than this timestamp + /// - If `retain_last` is provided, retain the last `retain_last` snapshots + /// - Never expire the current snapshot! + /// + /// Returns a Vec of SnapshotRefs that should be expired (references removed, and deleted). + fn identify_snapshots_to_expire(&self) -> Result<Vec<SnapshotRef>> { + let metadata = self.table.metadata(); + let all_snapshots: Vec<SnapshotRef> = metadata.snapshots().cloned().collect(); + + if all_snapshots.is_empty() { + return Ok(vec![]); + } + + if !self.config.snapshot_ids.is_empty() { + let snapshot_id_set: HashSet<i64> = self.config.snapshot_ids.iter().cloned().collect(); + let snapshots_to_expire: Vec<SnapshotRef> = all_snapshots + .into_iter() + .filter(|snapshot| snapshot_id_set.contains(&snapshot.snapshot_id())) + .collect(); + + if let Some(current_snapshot_id) = metadata.current_snapshot_id() { + if snapshot_id_set.contains(¤t_snapshot_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot expire the current snapshot", + )); + } + } + + return Ok(snapshots_to_expire); + } + + let mut sorted_snapshots = all_snapshots; + sorted_snapshots.sort_by_key(|snapshot| snapshot.timestamp_ms()); + + let mut snapshots_to_expire = vec![]; + let retain_last = self.config.retain_last.unwrap_or(1) as usize; + + if sorted_snapshots.len() <= retain_last { + return Ok(vec![]); + } + + let mut candidates = sorted_snapshots; + + candidates.truncate(candidates.len().saturating_sub(retain_last)); + + if let Some(older_than_ms) = self.config.older_than_ms { + candidates.retain(|snapshot| snapshot.timestamp_ms() < older_than_ms); + } + + // NEVER expire the current snapshot! + if let Some(current_snapshot_id) = metadata.current_snapshot_id() { + candidates.retain(|snapshot| snapshot.snapshot_id() != current_snapshot_id); + } + + snapshots_to_expire.extend(candidates); + + Ok(snapshots_to_expire) + } + + /// Collect all files that should be deleted along with the expired snapshots + async fn collect_files_to_delete( + &self, + expired_snapshots: &[SnapshotRef], + ) -> Result<Vec<String>> { + let mut files_to_delete = Vec::new(); + let file_io = self.table.file_io(); + let metadata = self.table.metadata(); + + // Collect files from snapshots that are being expired + let mut expired_manifest_lists = HashSet::new(); + let mut expired_manifests = HashSet::new(); + + for snapshot in expired_snapshots { + expired_manifest_lists.insert(snapshot.manifest_list().to_string()); + + match snapshot.load_manifest_list(file_io, metadata).await { + Ok(manifest_list) => { + for manifest_entry in manifest_list.entries() { + expired_manifests.insert(manifest_entry.manifest_path.clone()); + } + } + Err(e) => { + // Log warning but continue - the manifest list file might already be deleted + eprintln!( + "Warning: Failed to load manifest list {}: {}", + snapshot.manifest_list(), + e + ); + } + } + } + + // Collect files that are still referenced by remaining snapshots + let remaining_snapshots: Vec<SnapshotRef> = metadata + .snapshots() + .filter(|snapshot| { + !expired_snapshots + .iter() + .any(|exp| exp.snapshot_id() == snapshot.snapshot_id()) + }) + .cloned() + .collect(); + + let mut still_referenced_manifest_lists = HashSet::new(); + let mut still_referenced_manifests = HashSet::new(); + + for snapshot in &remaining_snapshots { + still_referenced_manifest_lists.insert(snapshot.manifest_list().to_string()); + + match snapshot.load_manifest_list(file_io, metadata).await { + Ok(manifest_list) => { + for manifest_entry in manifest_list.entries() { + still_referenced_manifests.insert(manifest_entry.manifest_path.clone()); + } + } + Err(e) => { + // Log warning but continue + eprintln!( + "Warning: Failed to load manifest list {}: {}", + snapshot.manifest_list(), + e + ); + } + } + } + + for manifest_list_path in expired_manifest_lists { + if !still_referenced_manifest_lists.contains(&manifest_list_path) { + files_to_delete.push(manifest_list_path); + } + } + + for manifest_path in expired_manifests { + if !still_referenced_manifests.contains(&manifest_path) { + files_to_delete.push(manifest_path); + } + } + + Ok(files_to_delete) + } + + async fn process_file_deletion(&self, result: &mut ExpireSnapshotsResult, file_path: String) { + if file_path.ends_with(".avro") && file_path.contains("snap-") { + result.deleted_manifest_lists_count += 1; + } else if file_path.ends_with(".avro") { + result.deleted_manifest_files_count += 1; + } + } + + /// Delete files concurrently with respect to max_concurrent_deletes setting + /// Should not be called if dry_run is true, but this is checked for extra safety. + async fn delete_files(&self, files_to_delete: Vec<String>) -> Result<ExpireSnapshotsResult> { + let mut result = ExpireSnapshotsResult::default(); + + if self.config.dry_run { + for file_path in files_to_delete { + self.process_file_deletion(&mut result, file_path).await; + } + return Ok(result); + } + + let file_io = self.table.file_io(); + + if files_to_delete.is_empty() { + return Ok(result); + } + + let num_concurrent_deletes = self.config.max_concurrent_deletes.unwrap_or(1) as usize; + let mut delete_tasks: Vec<JoinHandle<Vec<Result<String>>>> = + Vec::with_capacity(num_concurrent_deletes); + + eprintln!("Num concurrent deletes: {}", num_concurrent_deletes); + + for task_index in 0..num_concurrent_deletes { + // Ideally we'd use a semaphore here to allow each thread to delete as fast as possible. + // However, we can't assume that tokio::sync::Semaphore is available, and AsyncStd + // does not appear to have a usable Semaphore. Instead, we'll pre-sort the files into + // `num_concurrent_deletes` equal size chunks and spawn a task for each chunk. + let task_file_paths: Vec<String> = files_to_delete + .iter() + .skip(task_index) + .step_by(num_concurrent_deletes) + .cloned() + .collect(); + let file_io_clone = file_io.clone(); + let task = crate::runtime::spawn(async move { + let mut results: Vec<Result<String>> = Vec::new(); + for file_path in task_file_paths { + match file_io_clone.delete(&file_path).await { + Ok(_) => { + eprintln!("Deleted file: {:?}", file_path); + results.push(Ok(file_path)); + } + Err(e) => { + eprintln!("Error deleting file: {:?}", e); + results.push(Err(e)); + } + } + } + results + }); + + delete_tasks.push(task); + } + + for task in delete_tasks { + let file_delete_results = task.await; + for file_delete_result in file_delete_results { + eprintln!("Deleted file: {:?}", file_delete_result); + match file_delete_result { + Ok(deleted_path) => { + self.process_file_deletion(&mut result, deleted_path).await; + } + Err(e) => { + eprintln!("Warning: File deletion task failed: {}", e); + } + } + } + } + + Ok(result) + } +} + +#[async_trait] +impl ExpireSnapshots for ExpireSnapshotsAction { + /// The main entrypoint for the expire snapshots action. This will: + /// + /// - Validate the table state + /// - Identify snapshots to expire + /// - Update the table metadata to remove expired snapshots + /// - Collect files to delete + /// - Delete the files + async fn execute(&self, catalog: &dyn Catalog) -> Result<ExpireSnapshotsResult> { + if self.table.readonly() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Cannot expire snapshots on a readonly table", + )); + } + + let snapshots_to_expire = self.identify_snapshots_to_expire()?; + + if snapshots_to_expire.is_empty() { + return Ok(ExpireSnapshotsResult::default()); + } + + let files_to_delete = self.collect_files_to_delete(&snapshots_to_expire).await?; + + if self.config.dry_run { + let mut result = ExpireSnapshotsResult::default(); + for file_path in files_to_delete { + self.process_file_deletion(&mut result, file_path).await; + } + return Ok(result); + } + + // update the table metadata to remove the expired snapshots _before_ deleting anything! + // TODO: make this retry Review Comment: I'm planning to add retry logic in `tx.commit`, so you don't have to retry here issue: https://github.com/apache/iceberg-rust/issues/1387 ########## crates/iceberg/src/maintenance/expire_snapshots.rs: ########## @@ -0,0 +1,1144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Expire snapshots maintenance operation +//! +//! This module implements the expire snapshots operation that removes old snapshots +//! and their associated metadata files from the table while keeping the table +//! in a consistent state. + +use std::collections::HashSet; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::runtime::JoinHandle; +use crate::spec::SnapshotRef; +use crate::table::Table; +use crate::transaction::Transaction; +use crate::{Catalog, Error, ErrorKind, TableUpdate}; + +/// Result of the expire snapshots operation. Contains information about how many files were +/// deleted. +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ExpireSnapshotsResult { + /// Number of data files deleted. Data file deletion is not supported by this action yet, this + /// will always be 0. + pub deleted_data_files_count: u64, + /// Number of position delete files deleted. Position delete file deletion is not supported by + /// this action yet, this will always be 0. + pub deleted_position_delete_files_count: u64, + /// Number of equality delete files deleted. Equality delete file deletion is not supported by + /// this action yet, this will always be 0. + pub deleted_equality_delete_files_count: u64, + /// Number of manifest files deleted + pub deleted_manifest_files_count: u64, + /// Number of manifest list files deleted + pub deleted_manifest_lists_count: u64, + /// Number of statistics files deleted. Statistics file deletion is not supported by this action + /// yet, this will always be 0. + pub deleted_statistics_files_count: u64, +} + +/// Configuration for the expire snapshots operation +#[derive(Debug, Clone)] +pub struct ExpireSnapshotsConfig { + /// Timestamp in milliseconds. Snapshots older than this will be expired + pub older_than_ms: Option<i64>, + /// Minimum number of snapshots to retain + pub retain_last: Option<u32>, + /// Maximum number of concurrent file deletions + pub max_concurrent_deletes: Option<u32>, + /// Specific snapshot IDs to expire + pub snapshot_ids: Vec<i64>, + /// Whether to perform a dry run. If true, the operation will not delete any files, but will + /// still identify the files to delete and return the result. + pub dry_run: bool, +} + +impl Default for ExpireSnapshotsConfig { + fn default() -> Self { + Self { + older_than_ms: None, + retain_last: Some(1), // Default to retaining at least 1 snapshot + max_concurrent_deletes: None, + snapshot_ids: vec![], + dry_run: false, + } + } +} + +/// Trait for performing expire snapshots operations +/// +/// This trait provides a low-level API for expiring snapshots that can be +/// extended with different implementations for different environments. +#[async_trait] +pub trait ExpireSnapshots: Send + Sync { + /// Execute the expire snapshots operation + async fn execute(&self, catalog: &dyn Catalog) -> Result<ExpireSnapshotsResult>; +} + +/// Implementation of the expire snapshots operation +pub struct ExpireSnapshotsAction { + table: Table, + config: ExpireSnapshotsConfig, +} Review Comment: I'm thinking of something like this so it leverages the new Transaction code logic: ``` pub trait ExpireSnapshots: Send + Sync { /// Trigger tx.commit, delete files if tx.commit succeeded, and collect ExpireSnapshotsResult async fn execute(&self, catalog: &dyn Catalog) -> Result<ExpireSnapshotsResult>; fn table(&self) -> Table; // returns the table that this action operates on so ExpireSnapshotsAction doesn't have to hold a Table } impl TransactionAction for ExpireSnapshotsAction { async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> { // does the actual snapshots cleaning // wraps updates and requirements into ActionCommit for Catalog to update metadata } } ``` ########## crates/iceberg/src/transaction/mod.rs: ########## @@ -76,7 +76,7 @@ impl Transaction { Ok(()) } - fn apply( + pub(crate) fn apply( Review Comment: With the new Transaction API, Transaction should only hold `actions` (pending actions to commit), `updates` and `requirements` should be removed after this PR: https://github.com/apache/iceberg-rust/pull/1451 Based on this thought, `tx.apply` shouldn't be accessible to anything except `tx.commit` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org