marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584120346


##########
crates/iceberg/src/scan.rs:
##########
@@ -169,55 +177,66 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = 
HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        );
+
+        let bound_filter = context.bound_filter()?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator 
cache for an existing
-                // PartitionEvaluator that matches this manifest's partition 
spec ID.
-                // Use one from the cache if there is one. If not, create one, 
put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if 
!manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        
manifest_evaluator_cache.insert(entry.partition_spec_id, 
Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), 
table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = 
&manifest_evaluator_cache[&entry.partition_spec_id];
+                if let Some(filter) = &bound_filter {

Review Comment:
   should be done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to