liurenjie1024 commented on code in PR #512: URL: https://github.com/apache/iceberg-rust/pull/512#discussion_r1720743099
########## crates/iceberg/src/io/object_cache.rs: ########## @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::io::FileIO; +use crate::spec::{ + FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef, +}; +use crate::{Error, ErrorKind, Result}; + +const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; // 32MB + +#[derive(Clone, Debug)] +pub(crate) enum CachedItem { + ManifestList(Arc<ManifestList>), + Manifest(Arc<Manifest>), +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub(crate) enum CachedObjectKey { + ManifestList((String, FormatVersion, SchemaId)), + Manifest(String), +} + +/// Caches metadata objects deserialized from immutable files +#[derive(Clone, Debug)] +pub struct ObjectCache { + cache: moka::future::Cache<CachedObjectKey, CachedItem>, + file_io: FileIO, + cache_disabled: bool, +} + +impl ObjectCache { + /// Creates a new [`ObjectCache`] + /// with the default cache size + pub(crate) fn new(file_io: FileIO) -> Self { + Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES) + } + + /// Creates a new [`ObjectCache`] + /// with a specific cache size + pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self { + if cache_size_bytes == 0 { + Self::with_disabled_cache(file_io) + } else { + Self { + cache: moka::future::Cache::new(cache_size_bytes), + file_io, + cache_disabled: false, + } + } + } + + /// Creates a new [`ObjectCache`] + /// with caching disabled + pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self { + Self { + cache: moka::future::Cache::new(0), + file_io, + cache_disabled: true, + } + } + + /// Retrieves an Arc [`Manifest`] from the cache + /// or retrieves one from FileIO and parses it if not present + pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> { + if self.cache_disabled { + return manifest_file + .load_manifest(&self.file_io) + .await + .map(Arc::new); + } + + let key = CachedObjectKey::Manifest(manifest_file.manifest_path.clone()); + + let cache_entry = self + .cache + .entry_by_ref(&key) + .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file)) + .await + .map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))? Review Comment: ```suggestion .map_err(|err| Error::new(ErrorKind::Unexpected, format!("Failed load manifest {}", manifest_file.manifest_path)).with_source(err))? ``` ########## crates/iceberg/src/io/object_cache.rs: ########## @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::io::FileIO; +use crate::spec::{ + FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef, +}; +use crate::{Error, ErrorKind, Result}; + +const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; // 32MB + +#[derive(Clone, Debug)] +pub(crate) enum CachedItem { + ManifestList(Arc<ManifestList>), + Manifest(Arc<Manifest>), +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub(crate) enum CachedObjectKey { + ManifestList((String, FormatVersion, SchemaId)), + Manifest(String), +} + +/// Caches metadata objects deserialized from immutable files +#[derive(Clone, Debug)] +pub struct ObjectCache { + cache: moka::future::Cache<CachedObjectKey, CachedItem>, + file_io: FileIO, + cache_disabled: bool, +} + +impl ObjectCache { + /// Creates a new [`ObjectCache`] + /// with the default cache size + pub(crate) fn new(file_io: FileIO) -> Self { + Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES) + } + + /// Creates a new [`ObjectCache`] + /// with a specific cache size + pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self { + if cache_size_bytes == 0 { + Self::with_disabled_cache(file_io) + } else { + Self { + cache: moka::future::Cache::new(cache_size_bytes), + file_io, + cache_disabled: false, + } + } + } + + /// Creates a new [`ObjectCache`] + /// with caching disabled + pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self { + Self { + cache: moka::future::Cache::new(0), + file_io, + cache_disabled: true, + } + } + + /// Retrieves an Arc [`Manifest`] from the cache + /// or retrieves one from FileIO and parses it if not present + pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> { + if self.cache_disabled { + return manifest_file + .load_manifest(&self.file_io) + .await + .map(Arc::new); + } + + let key = CachedObjectKey::Manifest(manifest_file.manifest_path.clone()); + + let cache_entry = self + .cache + .entry_by_ref(&key) + .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file)) + .await + .map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))? + .into_value(); + + match cache_entry { + CachedItem::Manifest(arc_manifest) => Ok(arc_manifest), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("cached object for key '{:?}' is not a Manifest", key), + )), + } + } + + /// Retrieves an Arc [`ManifestList`] from the cache + /// or retrieves one from FileIO and parses it if not present + pub(crate) async fn get_manifest_list( + &self, + snapshot: &SnapshotRef, + table_metadata: &TableMetadataRef, + ) -> Result<Arc<ManifestList>> { + if self.cache_disabled { + return snapshot + .load_manifest_list(&self.file_io, table_metadata) + .await + .map(Arc::new); + } + + let key = CachedObjectKey::ManifestList(( + snapshot.manifest_list().to_string(), + table_metadata.format_version, + snapshot.schema_id().unwrap(), + )); + let cache_entry = self + .cache + .entry_by_ref(&key) + .or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata)) + .await + .map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))? + .into_value(); + + match cache_entry { + CachedItem::ManifestList(arc_manifest_list) => Ok(arc_manifest_list), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("cached object for path '{:?}' is not a Manifest", key), Review Comment: ```suggestion format!("cached object for path '{:?}' is not a manifest list", key), ``` ########## crates/iceberg/src/io/object_cache.rs: ########## @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::io::FileIO; +use crate::spec::{ + FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef, +}; +use crate::{Error, ErrorKind, Result}; + +const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; // 32MB + +#[derive(Clone, Debug)] +pub(crate) enum CachedItem { + ManifestList(Arc<ManifestList>), + Manifest(Arc<Manifest>), +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +pub(crate) enum CachedObjectKey { + ManifestList((String, FormatVersion, SchemaId)), Review Comment: Why a `ManifestList` key isn't a simple `String` just like `Manifest`? If they both are just String, I think we don't need this enum? ########## crates/iceberg/src/io/object_cache.rs: ########## Review Comment: Could we add some ut for this ? ########## crates/iceberg/src/table.rs: ########## @@ -62,6 +190,11 @@ impl Table { &self.file_io } + /// Returns this table's object cache + pub fn object_cache(&self) -> Arc<ObjectCache> { Review Comment: ```suggestion pub(crate) fn object_cache(&self) -> Arc<ObjectCache> { ``` How about making it crate private for now? I can't come up with a case where user need it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org