Xuanwo commented on code in PR #512:
URL: https://github.com/apache/iceberg-rust/pull/512#discussion_r1711342057


##########
crates/iceberg/src/io/object_cache.rs:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::io::FileIO;
+use crate::spec::{
+    FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, 
SnapshotRef, TableMetadataRef,
+};
+use crate::{Error, ErrorKind, Result};
+
+const DEFAULT_CACHE_SIZE_BYTES: u64 = 2 ^ 15; // 32MB
+
+#[derive(Clone, Debug)]
+pub(crate) enum CachedItem {
+    ManifestList(Arc<ManifestList>),
+    Manifest(Arc<Manifest>),
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub(crate) enum CachedObjectKey {
+    ManifestList((String, FormatVersion, SchemaId)),
+    Manifest(String),
+}
+
+/// Caches metadata objects deserialized from immutable files
+#[derive(Clone, Debug)]
+pub struct ObjectCache {

Review Comment:
   I have an idea to make the cache more transparent, but the comment section 
is too small to explain it. I will prepare a PR once this one is merged. 
:love_letter: 



##########
crates/iceberg/src/io/object_cache.rs:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::io::FileIO;
+use crate::spec::{
+    FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, 
SnapshotRef, TableMetadataRef,
+};
+use crate::{Error, ErrorKind, Result};
+
+const DEFAULT_CACHE_SIZE_BYTES: u64 = 2 ^ 15; // 32MB
+
+#[derive(Clone, Debug)]
+pub(crate) enum CachedItem {
+    ManifestList(Arc<ManifestList>),
+    Manifest(Arc<Manifest>),
+}
+
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub(crate) enum CachedObjectKey {
+    ManifestList((String, FormatVersion, SchemaId)),
+    Manifest(String),
+}
+
+/// Caches metadata objects deserialized from immutable files
+#[derive(Clone, Debug)]
+pub struct ObjectCache {
+    cache: moka::future::Cache<CachedObjectKey, CachedItem>,
+    file_io: FileIO,
+    cache_disabled: bool,
+}
+
+impl ObjectCache {
+    /// Creates a new [`ObjectCache`]
+    /// with the default cache size
+    pub(crate) fn new(file_io: FileIO) -> Self {
+        Self::new_with_cache_size(file_io, DEFAULT_CACHE_SIZE_BYTES)
+    }
+
+    /// Creates a new [`ObjectCache`]
+    /// with a specific cache size
+    pub(crate) fn new_with_cache_size(file_io: FileIO, cache_size_bytes: u64) 
-> Self {

Review Comment:
   We often use `capacity` for this. What do you think? For example, 
`ObjectCache::with_capacity`.
   
   `ObjectCache::new_with_cache_size` doesn't look appealing to me.



##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -94,6 +94,12 @@ impl Manifest {
         &self.entries
     }
 
+    /// Consume this Manifest, returning its constituent parts
+    pub fn consume(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {

Review Comment:
   We often use `into_parts` for API like this, do you like it?



##########
crates/iceberg/src/table.rs:
##########
@@ -16,28 +16,156 @@
 // under the License.
 
 //! Table API for Apache Iceberg
-use typed_builder::TypedBuilder;
+
+use std::sync::Arc;
 
 use crate::arrow::ArrowReaderBuilder;
+use crate::io::object_cache::ObjectCache;
 use crate::io::FileIO;
 use crate::scan::TableScanBuilder;
 use crate::spec::{TableMetadata, TableMetadataRef};
-use crate::{Result, TableIdent};
+use crate::{Error, ErrorKind, Result, TableIdent};
+
+/// Builder to create table scan.
+pub struct TableBuilder {
+    file_io: Option<FileIO>,
+    metadata_location: Option<String>,
+    metadata: Option<TableMetadataRef>,
+    identifier: Option<TableIdent>,
+    readonly: bool,
+    disable_cache: bool,
+    cache_size_bytes: Option<u64>,
+}
+
+impl TableBuilder {
+    pub(crate) fn new() -> Self {
+        Self {
+            file_io: None,
+            metadata_location: None,
+            metadata: None,
+            identifier: None,
+            readonly: false,
+            disable_cache: false,
+            cache_size_bytes: None,
+        }
+    }
+
+    /// required - sets the necessary FileIO to use for the table
+    pub fn file_io(mut self, file_io: FileIO) -> Self {
+        self.file_io = Some(file_io);
+        self
+    }
+
+    /// optional - sets the tables metadata location
+    pub fn metadata_location<T: Into<String>>(mut self, metadata_location: T) 
-> Self {
+        self.metadata_location = Some(metadata_location.into());
+        self
+    }
+
+    /// required - passes in the TableMetadata to use for the Table
+    pub fn metadata<T: Into<TableMetadataRef>>(mut self, metadata: T) -> Self {
+        self.metadata = Some(metadata.into());
+        self
+    }
+
+    /// required - passes in the TableIdent to use for the Table
+    pub fn identifier(mut self, identifier: TableIdent) -> Self {
+        self.identifier = Some(identifier);
+        self
+    }
+
+    /// specifies if the Table is readonly or not (default not)
+    pub fn readonly(mut self, readonly: bool) -> Self {
+        self.readonly = readonly;
+        self
+    }
+
+    /// specifies if the Table's metadata cache will be disabled,
+    /// so that reads of Manifests and ManifestLists will never
+    /// get cached.
+    pub fn disable_cache(mut self) -> Self {
+        self.disable_cache = true;
+        self
+    }
+
+    /// optionally set a non-default metadata cache size
+    pub fn cache_size_bytes(mut self, cache_size_bytes: u64) -> Self {
+        self.cache_size_bytes = Some(cache_size_bytes);
+        self
+    }

Review Comment:
   This is the area I want to improve. If we can incorporate it into `FileIO`, 
we might avoid exposing the API wherever we need caching.
   
   However, I think it's not a blocker of this PR. I'm willing to help improve 
this later.



##########
Cargo.toml:
##########
@@ -66,6 +66,7 @@ itertools = "0.13"
 log = "^0.4"
 mockito = "^1"
 murmur3 = "0.5.2"
+num_cpus = "1"

Review Comment:
   We can replace this by 
[std::thread::available_parallelism](https://doc.rust-lang.org/stable/std/thread/fn.available_parallelism.html)



##########
crates/iceberg/src/io/object_cache.rs:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::io::FileIO;
+use crate::spec::{
+    FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, 
SnapshotRef, TableMetadataRef,
+};
+use crate::{Error, ErrorKind, Result};
+
+const DEFAULT_CACHE_SIZE_BYTES: u64 = 2 ^ 15; // 32MB

Review Comment:
   How about using `32 * 1024 * 1024` to make it clearer? I'm not good at math 
:laughing: 



##########
crates/iceberg/src/table.rs:
##########
@@ -16,28 +16,156 @@
 // under the License.
 
 //! Table API for Apache Iceberg
-use typed_builder::TypedBuilder;
+
+use std::sync::Arc;
 
 use crate::arrow::ArrowReaderBuilder;
+use crate::io::object_cache::ObjectCache;
 use crate::io::FileIO;
 use crate::scan::TableScanBuilder;
 use crate::spec::{TableMetadata, TableMetadataRef};
-use crate::{Result, TableIdent};
+use crate::{Error, ErrorKind, Result, TableIdent};
+
+/// Builder to create table scan.
+pub struct TableBuilder {

Review Comment:
   We frequently used `typed_builder`. Does it work well in this situation too?



##########
crates/iceberg/Cargo.toml:
##########
@@ -60,7 +60,9 @@ derive_builder = { workspace = true }
 fnv = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
+moka = { version = "0.12.8", features = ["future"] }
 murmur3 = { workspace = true }
+num_cpus = { workspace = true }

Review Comment:
   The same.



##########
crates/iceberg/src/spec/manifest_list.rs:
##########
@@ -78,6 +78,11 @@ impl ManifestList {
     pub fn entries(&self) -> &[ManifestFile] {
         &self.entries
     }
+
+    /// Take ownership of the entries in the manifest list, consuming it
+    pub fn consume_entries(self) -> impl IntoIterator<Item = ManifestFile> {

Review Comment:
   It's better to implement `IntoIterator` for `ManifestList` instead. This 
will allow users to use `ManifestList` more easily with other iterator-based 
APIs.



##########
crates/iceberg/src/scan.rs:
##########
@@ -199,298 +256,588 @@ impl<'a> TableScanBuilder<'a> {
             field_ids.push(field_id);
         }
 
-        Ok(TableScan {
+        let snapshot_bound_predicate = if let Some(ref predicates) = 
self.filter {
+            Some(predicates.bind(schema.clone(), true)?)
+        } else {
+            None
+        };
+
+        let plan_context = PlanContext {
             snapshot,
-            file_io: self.table.file_io().clone(),
             table_metadata: self.table.metadata_ref(),
-            column_names: self.column_names,
-            field_ids,
-            bound_predicates,
-            schema,
-            batch_size: self.batch_size,
+            snapshot_schema: schema,
             case_sensitive: self.case_sensitive,
-            filter: self.filter.map(Arc::new),
+            predicate: self.filter.map(Arc::new),
+            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
+            object_cache: self.table.object_cache(),
+            field_ids: Arc::new(field_ids),
+            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
+            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
+            expression_evaluator_cache: 
Arc::new(ExpressionEvaluatorCache::new()),
+        };
+
+        Ok(TableScan {
+            batch_size: self.batch_size,
+            column_names: self.column_names,
+            file_io: self.table.file_io().clone(),
+            plan_context,
+            table_scan_config: self.table_scan_config,
         })
     }
 }
 
 /// Table scan.
 #[derive(Debug)]
 pub struct TableScan {
-    snapshot: SnapshotRef,
-    table_metadata: TableMetadataRef,
+    plan_context: PlanContext,
+    batch_size: Option<usize>,
     file_io: FileIO,
     column_names: Vec<String>,
-    field_ids: Vec<i32>,
-    bound_predicates: Option<BoundPredicate>,
-    schema: SchemaRef,
-    batch_size: Option<usize>,
+    table_scan_config: TableScanConfig,
+}
+
+/// PlanContext wraps a [`SnapshotRef`] alongside all the other
+/// objects that are required to perform a scan file plan.
+#[derive(Debug)]
+struct PlanContext {

Review Comment:
   Do you think it's a good idea to use `Arc<PlanContext>` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to