liurenjie1024 commented on code in PR #62: URL: https://github.com/apache/iceberg-rust/pull/62#discussion_r1357880510
########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { Review Comment: Move this to `error` module? ########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, ufe.to_string()) Review Comment: ```suggestion Error::new(ErrorKind::DataInvalid, "Some fields of table metadata not inited") .with_source(ufe) ``` ########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, ufe.to_string()) + } +} + +impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap<String, SnapshotReference> { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); + + if let Some(partition_spec) = tc.partition_spec { + self.with_default_partition_spec(partition_spec); + } + self + } + + /// Add or replace a schema + /// schema : Schema to be added or replaced + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { + if let Some(map) = self.schemas.as_mut() { + map.insert(schema.schema_id(), Arc::new(schema)); + } else { + self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); + } + self + } + + /// Add or replace a schema and set current schema to this one + /// schema : Schema to be added or replaced + pub fn with_current_schema(&mut self, schema: Schema) -> &mut Self { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + self.with_schema(schema) + } + + /// Add or replace a partition_spec and update the last_partition_id accordingly + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { Review Comment: ```suggestion pub fn add_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { ``` 1. There are same problems as scheme id. 2. We need to validate spec with schema: * If `source_id` exists in schema * If the field type supports such transformation. ########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, ufe.to_string()) + } +} + +impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap<String, SnapshotReference> { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); + + if let Some(partition_spec) = tc.partition_spec { + self.with_default_partition_spec(partition_spec); + } + self + } + + /// Add or replace a schema + /// schema : Schema to be added or replaced + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { + if let Some(map) = self.schemas.as_mut() { + map.insert(schema.schema_id(), Arc::new(schema)); + } else { + self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); + } + self + } + + /// Add or replace a schema and set current schema to this one + /// schema : Schema to be added or replaced + pub fn with_current_schema(&mut self, schema: Schema) -> &mut Self { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + self.with_schema(schema) + } + + /// Add or replace a partition_spec and update the last_partition_id accordingly + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + let max_id = partition_spec + .fields + .iter() + .map(|field| field.field_id) + .max(); + if max_id > self.last_partition_id { + self.last_partition_id = max_id; + } + if let Some(map) = self.partition_specs.as_mut() { + map.insert(partition_spec.spec_id, partition_spec); + } else { + self.partition_specs = Some(HashMap::from([(partition_spec.spec_id, partition_spec)])); + } + self + } + + /// Add or replace a partition_spec, update the last_partition_id accordingly + /// and set the default spec id to the partition_spec id + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_default_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + self.default_spec_id = Some(partition_spec.spec_id); + self.with_partition_spec(partition_spec) + } + + /// Add or replace a snapshot to the main branch, update last_sequence_number + /// snapshot : Snapshot to be added or replaced + pub fn with_branch_snapshot(&mut self, branch: String, snapshot: Snapshot) -> &mut Self { + if Some(snapshot.sequence_number()) > self.last_sequence_number { + self.last_sequence_number = Some(snapshot.sequence_number()); + } + if self.last_updated_ms < Some(snapshot.timestamp()) { + self.last_updated_ms = Some(snapshot.timestamp()); + } + self.with_ref( + branch, + SnapshotReference::new( + snapshot.snapshot_id(), + SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + ), + ); + if let Some(Some(map)) = self.snapshots.as_mut() { + map.insert(snapshot.snapshot_id(), Arc::new(snapshot)); + } else { + self.snapshots = Some(Some(HashMap::from([( + snapshot.snapshot_id(), + Arc::new(snapshot), + )]))); + } + self + } + + /// Add or replace a snapshot to the main branch, update last_sequence_number + /// snapshot : Snapshot to be added or replaced + pub fn with_snapshot(&mut self, snapshot: Snapshot) -> &mut Self { Review Comment: ```suggestion pub fn add_snapshot(&mut self, snapshot: Snapshot) -> &mut Self { ``` ########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, ufe.to_string()) + } +} + +impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap<String, SnapshotReference> { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); + + if let Some(partition_spec) = tc.partition_spec { + self.with_default_partition_spec(partition_spec); + } + self + } + + /// Add or replace a schema + /// schema : Schema to be added or replaced + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { + if let Some(map) = self.schemas.as_mut() { + map.insert(schema.schema_id(), Arc::new(schema)); + } else { + self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); + } + self + } + + /// Add or replace a schema and set current schema to this one + /// schema : Schema to be added or replaced + pub fn with_current_schema(&mut self, schema: Schema) -> &mut Self { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + self.with_schema(schema) + } + + /// Add or replace a partition_spec and update the last_partition_id accordingly + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + let max_id = partition_spec + .fields + .iter() + .map(|field| field.field_id) + .max(); + if max_id > self.last_partition_id { + self.last_partition_id = max_id; + } + if let Some(map) = self.partition_specs.as_mut() { + map.insert(partition_spec.spec_id, partition_spec); + } else { + self.partition_specs = Some(HashMap::from([(partition_spec.spec_id, partition_spec)])); + } + self + } + + /// Add or replace a partition_spec, update the last_partition_id accordingly + /// and set the default spec id to the partition_spec id + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_default_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + self.default_spec_id = Some(partition_spec.spec_id); + self.with_partition_spec(partition_spec) + } + + /// Add or replace a snapshot to the main branch, update last_sequence_number + /// snapshot : Snapshot to be added or replaced + pub fn with_branch_snapshot(&mut self, branch: String, snapshot: Snapshot) -> &mut Self { + if Some(snapshot.sequence_number()) > self.last_sequence_number { + self.last_sequence_number = Some(snapshot.sequence_number()); + } + if self.last_updated_ms < Some(snapshot.timestamp()) { + self.last_updated_ms = Some(snapshot.timestamp()); + } + self.with_ref( Review Comment: We should copy existing snapshot ref's retention policy if it already exists. ########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, ufe.to_string()) + } +} + +impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap<String, SnapshotReference> { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); + + if let Some(partition_spec) = tc.partition_spec { + self.with_default_partition_spec(partition_spec); + } + self + } + + /// Add or replace a schema + /// schema : Schema to be added or replaced + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { Review Comment: ```suggestion pub fn add_schema(&mut self, schema: Schema) -> &mut Self { ``` 1. Currently we use 0 as default schema id, e.g. the schema id is not set by user. I think we should check this and auto assign a new schema id for it. 2. I think by default we should fail when id already exists. We can add another method for the `insert or update` behavior. ########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, ufe.to_string()) + } +} + +impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap<String, SnapshotReference> { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); + + if let Some(partition_spec) = tc.partition_spec { + self.with_default_partition_spec(partition_spec); + } + self + } + + /// Add or replace a schema + /// schema : Schema to be added or replaced + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { + if let Some(map) = self.schemas.as_mut() { + map.insert(schema.schema_id(), Arc::new(schema)); + } else { + self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); + } + self + } + + /// Add or replace a schema and set current schema to this one + /// schema : Schema to be added or replaced + pub fn with_current_schema(&mut self, schema: Schema) -> &mut Self { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + self.with_schema(schema) + } + + /// Add or replace a partition_spec and update the last_partition_id accordingly + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + let max_id = partition_spec + .fields + .iter() + .map(|field| field.field_id) + .max(); + if max_id > self.last_partition_id { + self.last_partition_id = max_id; + } + if let Some(map) = self.partition_specs.as_mut() { + map.insert(partition_spec.spec_id, partition_spec); + } else { + self.partition_specs = Some(HashMap::from([(partition_spec.spec_id, partition_spec)])); + } + self + } + + /// Add or replace a partition_spec, update the last_partition_id accordingly + /// and set the default spec id to the partition_spec id + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_default_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + self.default_spec_id = Some(partition_spec.spec_id); + self.with_partition_spec(partition_spec) + } + + /// Add or replace a snapshot to the main branch, update last_sequence_number + /// snapshot : Snapshot to be added or replaced + pub fn with_branch_snapshot(&mut self, branch: String, snapshot: Snapshot) -> &mut Self { + if Some(snapshot.sequence_number()) > self.last_sequence_number { + self.last_sequence_number = Some(snapshot.sequence_number()); + } + if self.last_updated_ms < Some(snapshot.timestamp()) { + self.last_updated_ms = Some(snapshot.timestamp()); + } + self.with_ref( + branch, + SnapshotReference::new( + snapshot.snapshot_id(), + SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + ), + ); + if let Some(Some(map)) = self.snapshots.as_mut() { + map.insert(snapshot.snapshot_id(), Arc::new(snapshot)); + } else { + self.snapshots = Some(Some(HashMap::from([( + snapshot.snapshot_id(), + Arc::new(snapshot), + )]))); + } + self + } + + /// Add or replace a snapshot to the main branch, update last_sequence_number + /// snapshot : Snapshot to be added or replaced + pub fn with_snapshot(&mut self, snapshot: Snapshot) -> &mut Self { + self.with_branch_snapshot("main".to_string(), snapshot) + } + + /// Add or replace a sort_order + /// sort_order : SortOrder to be added or replaced + pub fn with_sort_order(&mut self, sort_order: SortOrder) -> &mut Self { Review Comment: Similar problem as partition_spec ########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, ufe.to_string()) + } +} + +impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap<String, SnapshotReference> { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); + + if let Some(partition_spec) = tc.partition_spec { + self.with_default_partition_spec(partition_spec); + } + self + } + + /// Add or replace a schema + /// schema : Schema to be added or replaced + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { + if let Some(map) = self.schemas.as_mut() { + map.insert(schema.schema_id(), Arc::new(schema)); + } else { + self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); + } + self + } + + /// Add or replace a schema and set current schema to this one + /// schema : Schema to be added or replaced + pub fn with_current_schema(&mut self, schema: Schema) -> &mut Self { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + self.with_schema(schema) Review Comment: Same as problem mentioned above, we need to consider the default schema id problem. ########## crates/iceberg/src/spec/table_metadata.rs: ########## @@ -93,22 +116,278 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default, setter(custom))] metadata_log: Vec<MetadataLog>, /// A list of sort orders, stored as full sort order objects. + #[builder(default, setter(custom))] sort_orders: HashMap<i64, SortOrder>, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap<String, SnapshotReference>, } +// We define a from implementation from builder Error to Iceberg Error +impl From<UninitializedFieldError> for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, ufe.to_string()) + } +} + +impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap<String, SnapshotReference> { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); + + if let Some(partition_spec) = tc.partition_spec { + self.with_default_partition_spec(partition_spec); + } + self + } + + /// Add or replace a schema + /// schema : Schema to be added or replaced + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { + if let Some(map) = self.schemas.as_mut() { + map.insert(schema.schema_id(), Arc::new(schema)); + } else { + self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); + } + self + } + + /// Add or replace a schema and set current schema to this one + /// schema : Schema to be added or replaced + pub fn with_current_schema(&mut self, schema: Schema) -> &mut Self { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + self.with_schema(schema) + } + + /// Add or replace a partition_spec and update the last_partition_id accordingly + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + let max_id = partition_spec + .fields + .iter() + .map(|field| field.field_id) + .max(); + if max_id > self.last_partition_id { + self.last_partition_id = max_id; + } + if let Some(map) = self.partition_specs.as_mut() { + map.insert(partition_spec.spec_id, partition_spec); + } else { + self.partition_specs = Some(HashMap::from([(partition_spec.spec_id, partition_spec)])); + } + self + } + + /// Add or replace a partition_spec, update the last_partition_id accordingly + /// and set the default spec id to the partition_spec id + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_default_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { Review Comment: Similar problem to schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org