c-thiel commented on code in PR #908: URL: https://github.com/apache/iceberg-rust/pull/908#discussion_r1967484771
########## crates/iceberg/src/spec/view_metadata_builder.rs: ########## @@ -0,0 +1,1566 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use chrono::Utc; +use itertools::Itertools; +use uuid::Uuid; + +use super::{ + Schema, SchemaId, TableMetadataBuilder, ViewFormatVersion, ViewMetadata, ViewRepresentation, + ViewVersion, ViewVersionLog, ViewVersionRef, DEFAULT_SCHEMA_ID, INITIAL_VIEW_VERSION_ID, + ONE_MINUTE_MS, VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED, + VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VIEW_PROPERTY_VERSION_HISTORY_SIZE, + VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT, +}; +use crate::catalog::ViewUpdate; +use crate::error::{Error, ErrorKind, Result}; +use crate::ViewCreation; + +/// Manipulating view metadata. +/// +/// For this builder the order of called functions matters. +/// All operations applied to the `ViewMetadata` are tracked in `changes` as a chronologically +/// ordered vec of `ViewUpdate`. +/// If an operation does not lead to a change of the `ViewMetadata`, the corresponding update +/// is omitted from `changes`. +#[derive(Debug, Clone)] +pub struct ViewMetadataBuilder { + metadata: ViewMetadata, + changes: Vec<ViewUpdate>, + last_added_schema_id: Option<i32>, + last_added_version_id: Option<i32>, + history_entry: Option<ViewVersionLog>, + // Previous view version is only used during build to check + // weather dialects are dropped or not. + previous_view_version: Option<ViewVersionRef>, +} + +#[derive(Debug, Clone, PartialEq)] +/// Result of modifying or creating a `ViewMetadata`. +pub struct ViewMetadataBuildResult { + /// The new `ViewMetadata`. + pub metadata: ViewMetadata, + /// The changes that were applied to the metadata. + pub changes: Vec<ViewUpdate>, +} + +impl ViewMetadataBuilder { + const LAST_ADDED: i32 = TableMetadataBuilder::LAST_ADDED; + + /// Creates a new view metadata builder. + pub fn new( + location: String, + schema: Schema, + view_version: ViewVersion, + format_version: ViewFormatVersion, + properties: HashMap<String, String>, + ) -> Result<Self> { + let builder = Self { + metadata: ViewMetadata { + format_version, + view_uuid: Uuid::now_v7(), + location: "".to_string(), // Overwritten immediately by set_location + current_version_id: -1, // Overwritten immediately by set_current_version, + versions: HashMap::new(), // Overwritten immediately by set_current_version + version_log: Vec::new(), + schemas: HashMap::new(), // Overwritten immediately by set_current_version + properties: HashMap::new(), // Overwritten immediately by set_properties + }, + changes: vec![], + last_added_schema_id: None, // Overwritten immediately by set_current_version + last_added_version_id: None, // Overwritten immediately by set_current_version + history_entry: None, + previous_view_version: None, // This is a new view + }; + + builder + .set_location(location) + .set_current_version(view_version, schema)? + .set_properties(properties) + } + + /// Creates a new view metadata builder from the given metadata to modify it. + #[must_use] + pub fn new_from_metadata(previous: ViewMetadata) -> Self { + let previous_view_version = previous.current_version().clone(); + Self { + metadata: previous, + changes: Vec::default(), + last_added_schema_id: None, + last_added_version_id: None, + history_entry: None, + previous_view_version: Some(previous_view_version), + } + } + + /// Creates a new view metadata builder from the given view creation. + pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> { + let ViewCreation { + location, + schema, + properties, + name: _, + representations, + default_catalog, + default_namespace, + summary, + } = view_creation; + let version = ViewVersion::builder() + .with_default_catalog(default_catalog) + .with_default_namespace(default_namespace) + .with_representations(representations) + .with_schema_id(schema.schema_id()) + .with_summary(summary) + .with_timestamp_ms(Utc::now().timestamp_millis()) + .with_version_id(INITIAL_VIEW_VERSION_ID) + .build(); + + Self::new(location, schema, version, ViewFormatVersion::V1, properties) + } + + /// Upgrade `FormatVersion`. Downgrades are not allowed. + /// + /// # Errors + /// - Cannot downgrade to older format versions. + pub fn upgrade_format_version(self, format_version: ViewFormatVersion) -> Result<Self> { + if format_version < self.metadata.format_version { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade ViewFormatVersion from {} to {}", + self.metadata.format_version, format_version + ), + )); + } + + if format_version != self.metadata.format_version { + match format_version { + ViewFormatVersion::V1 => { + // No changes needed for V1 + } + } + } + + Ok(self) + } + + /// Set the location of the view, stripping any trailing slashes. + pub fn set_location(mut self, location: String) -> Self { + let location = location.trim_end_matches('/').to_string(); + if self.metadata.location != location { + self.changes.push(ViewUpdate::SetLocation { + location: location.clone(), + }); + self.metadata.location = location; + } + + self + } + + /// Set an existing view version as the current version. + /// + /// # Errors + /// - The specified `version_id` does not exist. + /// - The specified `version_id` is `-1` but no version has been added. + pub fn set_current_version_id(mut self, mut version_id: i32) -> Result<Self> { + if version_id == Self::LAST_ADDED && self.last_added_version_id.is_none() { + version_id = self.last_added_version_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set current version id to last added version: no version has been added.", + ) + })?; + } + let version_id = version_id; // make immutable + + if version_id == self.metadata.current_version_id { + return Ok(self); + } + + let version = self.metadata.versions.get(&version_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot set current version to unknown version with id: {}", + version_id + ), + ) + })?; + + self.metadata.current_version_id = version_id; + + if self.last_added_version_id == Some(version_id) { + self.changes.push(ViewUpdate::SetCurrentViewVersion { + view_version_id: Self::LAST_ADDED, + }); + } else { + self.changes.push(ViewUpdate::SetCurrentViewVersion { + view_version_id: version_id, + }); + } + + self.history_entry = Some(version.log()); + + Ok(self) + } + + /// Add a new view version and set it as current. + pub fn set_current_version( + mut self, + view_version: ViewVersion, + schema: Schema, + ) -> Result<Self> { + let schema_id = self.add_schema_internal(schema); + let view_version = view_version.with_schema_id(schema_id); + let view_version_id = self.add_version_internal(view_version)?; + self.set_current_version_id(view_version_id) + } + + /// Add a new version to the view. + /// + /// # Errors + /// - The schema ID of the version is set to `-1`, but no schema has been added. + /// - The schema ID of the specified version is unknown. + /// - Multiple queries for the same dialect are added. + pub fn add_version(mut self, view_version: ViewVersion) -> Result<Self> { + self.add_version_internal(view_version)?; + + Ok(self) + } + + fn add_version_internal(&mut self, view_version: ViewVersion) -> Result<i32> { + let version_id = self.reuse_or_create_new_view_version_id(&view_version); + let view_version = view_version.with_version_id(version_id); + + if self.metadata.versions.contains_key(&version_id) { + // ToDo Discuss: Similar to TableMetadata sort-order, Java does not add changes + // in this case. I prefer to add changes as the state of the builder is + // potentially mutated (`last_added_version_id`), thus we should record the change. + if self.last_added_version_id != Some(version_id) { + self.changes.push(ViewUpdate::AddViewVersion { + view_version: view_version.with_version_id(version_id), + }); + self.last_added_version_id = Some(version_id); + } + return Ok(version_id); + } + + let view_version = if view_version.schema_id() == Self::LAST_ADDED { + let last_added_schema_id = self.last_added_schema_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set last added schema: no schema has been added", + ) + })?; + view_version.with_schema_id(last_added_schema_id) + } else { + view_version + }; + + if !self + .metadata + .schemas + .contains_key(&view_version.schema_id()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add version with unknown schema: {}", + view_version.schema_id() + ), + )); + } + + require_unique_dialects(&view_version)?; + + // ToDo Discuss: This check is not present in Java. + // The `TableMetadataBuilder` uses these checks in multiple places - also in Java. + // If we think delayed requests are a problem, I think we should also add it here. + if let Some(last) = self.metadata.version_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if view_version.timestamp_ms() - last.timestamp_ms() < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last snapshot timestamp {}", + view_version.timestamp_ms(), + last.timestamp_ms() + ), + )); + } + } + + self.metadata + .versions + .insert(version_id, Arc::new(view_version.clone())); + + let view_version = if let Some(last_added_schema_id) = self.last_added_schema_id { + if view_version.schema_id() == last_added_schema_id { + view_version.with_schema_id(Self::LAST_ADDED) + } else { + view_version + } + } else { + view_version + }; + self.changes + .push(ViewUpdate::AddViewVersion { view_version }); + + self.last_added_version_id = Some(version_id); + + Ok(version_id) + } + + fn reuse_or_create_new_view_version_id(&self, new_view_version: &ViewVersion) -> i32 { + self.metadata + .versions + .iter() + .find_map(|(id, other_version)| { + new_view_version + .behaves_identical_to(other_version) + .then_some(*id) + }) + .unwrap_or_else(|| { + self.get_highest_view_version_id() + .map(|id| id + 1) + // ToDo Discuss: In Java this uses `new_view_version.version_id()` instead. + // I believe 0 is more appropriate here, as the first version should always be 0. + // The TableMetadataBuilder also uses 0 for partition specs (and other `reuse_or_create_*` functions). + // Consistent behaviour between the two builders is desirable. + .unwrap_or(INITIAL_VIEW_VERSION_ID) Review Comment: Changed initial view version id to 1 now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org