Fokko commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583486316


##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -16,74 +16,49 @@
 // under the License.
 
 use crate::expr::visitors::bound_predicate_visitor::{visit, 
BoundPredicateVisitor};
-use crate::expr::visitors::inclusive_projection::InclusiveProjection;
-use crate::expr::{Bind, BoundPredicate, BoundReference};
-use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, 
SchemaRef};
-use crate::{Error, ErrorKind};
+use crate::expr::{BoundPredicate, BoundReference};
+use crate::spec::{Datum, FieldSummary, ManifestFile};
+use crate::{Error, ErrorKind, Result};
 use fnv::FnvHashSet;
-use std::sync::Arc;
 
-/// Evaluates [`ManifestFile`]s to see if their partition summary matches a 
provided
-/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of 
[`ManifestFile`]s
+#[derive(Debug)]
+/// Evaluates a [`ManifestFile`] to see if the partition summaries
+/// match a provided [`BoundPredicate`].
+///
+/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
 /// in which data might be found that matches the TableScan's filter.
 pub(crate) struct ManifestEvaluator {
-    partition_schema: SchemaRef,
+    partition_schema_id: i32,
     partition_filter: BoundPredicate,
     case_sensitive: bool,
 }
 
 impl ManifestEvaluator {
     pub(crate) fn new(
-        partition_spec: PartitionSpecRef,
-        table_schema: SchemaRef,
-        filter: BoundPredicate,
+        partition_schema_id: i32,

Review Comment:
   To align the naming:
   ```suggestion
           partition_spec_id: i32,
   ```



##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -16,74 +16,49 @@
 // under the License.
 
 use crate::expr::visitors::bound_predicate_visitor::{visit, 
BoundPredicateVisitor};
-use crate::expr::visitors::inclusive_projection::InclusiveProjection;
-use crate::expr::{Bind, BoundPredicate, BoundReference};
-use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, 
SchemaRef};
-use crate::{Error, ErrorKind};
+use crate::expr::{BoundPredicate, BoundReference};
+use crate::spec::{Datum, FieldSummary, ManifestFile};
+use crate::{Error, ErrorKind, Result};
 use fnv::FnvHashSet;
-use std::sync::Arc;
 
-/// Evaluates [`ManifestFile`]s to see if their partition summary matches a 
provided
-/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of 
[`ManifestFile`]s
+#[derive(Debug)]
+/// Evaluates a [`ManifestFile`] to see if the partition summaries
+/// match a provided [`BoundPredicate`].
+///
+/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
 /// in which data might be found that matches the TableScan's filter.
 pub(crate) struct ManifestEvaluator {
-    partition_schema: SchemaRef,
+    partition_schema_id: i32,
     partition_filter: BoundPredicate,
     case_sensitive: bool,
 }
 
 impl ManifestEvaluator {
     pub(crate) fn new(
-        partition_spec: PartitionSpecRef,
-        table_schema: SchemaRef,
-        filter: BoundPredicate,
+        partition_schema_id: i32,

Review Comment:
   If it only for checking the spec-id, I would leave it out and make sure that 
we have proper tests to avoid having to do these runtime checks :)



##########
crates/iceberg/src/scan.rs:
##########
@@ -99,7 +107,7 @@ impl<'a> TableScanBuilder<'a> {
     }
 
     /// Build the table scan.
-    pub fn build(self) -> crate::Result<TableScan> {
+    pub fn build(self) -> Result<TableScan> {

Review Comment:
   Nit: should we have a convention to leave in, or remove the `crate::` 
prefix? Preferably also a checker



##########
crates/iceberg/src/scan.rs:
##########
@@ -169,55 +177,66 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = 
HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        );
+
+        let bound_filter = context.bound_filter()?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator 
cache for an existing
-                // PartitionEvaluator that matches this manifest's partition 
spec ID.
-                // Use one from the cache if there is one. If not, create one, 
put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if 
!manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        
manifest_evaluator_cache.insert(entry.partition_spec_id, 
Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), 
table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = 
&manifest_evaluator_cache[&entry.partition_spec_id];
+                if let Some(filter) = &bound_filter {
+                    let partition_spec_id = entry.partition_spec_id;
+
+                    let (partition_spec, partition_schema) =
+                        
context.create_partition_spec_and_schema(partition_spec_id)?;
+
+                    let partition_filter = partition_filter_cache.get(
+                        partition_spec_id,
+                        partition_spec.clone(),
+                        partition_schema.clone(),
+                        filter,
+                        context.case_sensitive,
+                    )?;
+
+                    let manifest_evaluator = manifest_evaluator_cache.get(
+                        partition_spec_id,
+                        partition_schema.schema_id(),
+                        partition_filter.clone(),
+                        context.case_sensitive,
+                    );
 
-                    // reject any manifest files whose partition values don't 
match the filter.
                     if !manifest_evaluator.eval(entry)? {
                         continue;
                     }
+
+                    // TODO: Create ExpressionEvaluator
                 }
 
-                let manifest = entry.load_manifest(&file_io).await?;
+                let manifest = entry.load_manifest(&context.file_io).await?;

Review Comment:
   For a future PR, you want to do this in parallel 👍 With FastAppend getting 
more traction, it is likely that there are many manifests.



##########
crates/iceberg/src/scan.rs:
##########
@@ -169,55 +177,66 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = 
HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        );
+
+        let bound_filter = context.bound_filter()?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator 
cache for an existing
-                // PartitionEvaluator that matches this manifest's partition 
spec ID.
-                // Use one from the cache if there is one. If not, create one, 
put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if 
!manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        
manifest_evaluator_cache.insert(entry.partition_spec_id, 
Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), 
table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = 
&manifest_evaluator_cache[&entry.partition_spec_id];
+                if let Some(filter) = &bound_filter {

Review Comment:
   Can we add a check here to make sure the `517: content` equals `data`:
   
   
![image](https://github.com/apache/iceberg-rust/assets/1134248/d687b434-3d91-485d-84a3-d3dea50e9784)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to