sdd commented on code in PR #373:
URL: https://github.com/apache/iceberg-rust/pull/373#discussion_r1690208650


##########
crates/iceberg/src/scan.rs:
##########
@@ -197,134 +193,103 @@ impl<'a> TableScanBuilder<'a> {
             field_ids.push(field_id);
         }
 
-        Ok(TableScan {
+        let snapshot_bound_predicate = if let Some(ref predicates) = 
self.filter {
+            Some(predicates.bind(schema.clone(), true)?)
+        } else {
+            None
+        };
+
+        let plan_context = PlanContext {
             snapshot,
-            file_io: self.table.file_io().clone(),
             table_metadata: self.table.metadata_ref(),
-            column_names: self.column_names,
-            field_ids,
-            bound_predicates,
-            schema,
-            batch_size: self.batch_size,
+            snapshot_schema: schema,
             case_sensitive: self.case_sensitive,
-            filter: self.filter.map(Arc::new),
+            predicate: self.filter.map(Arc::new),
+            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
+            file_io: self.table.file_io().clone(),
+            field_ids: Arc::new(field_ids),
+            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
+            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
+            expression_evaluator_cache: 
Arc::new(ExpressionEvaluatorCache::new()),
+        };
+
+        Ok(TableScan {
+            batch_size: self.batch_size,
+            column_names: self.column_names,
+            file_io: self.table.file_io().clone(),
+            plan_context,
         })
     }
 }
 
 /// Table scan.
 #[derive(Debug)]
 pub struct TableScan {
-    snapshot: SnapshotRef,
-    table_metadata: TableMetadataRef,
+    plan_context: PlanContext,
+    batch_size: Option<usize>,
     file_io: FileIO,
     column_names: Vec<String>,
-    field_ids: Vec<i32>,
-    bound_predicates: Option<BoundPredicate>,
-    schema: SchemaRef,
-    batch_size: Option<usize>,
+}
+
+/// PlanContext wraps a [`SnapshotRef`] alongside all the other
+/// objects that are required to perform a scan file plan.
+#[derive(Debug)]
+struct PlanContext {
+    snapshot: SnapshotRef,
+
+    table_metadata: TableMetadataRef,
+    snapshot_schema: SchemaRef,
     case_sensitive: bool,
-    filter: Option<Arc<Predicate>>,
+    predicate: Option<Arc<Predicate>>,
+    snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
+    file_io: FileIO,
+    field_ids: Arc<Vec<i32>>,
+
+    partition_filter_cache: Arc<PartitionFilterCache>,
+    manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
+    expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
 }
 
 impl TableScan {
     /// Returns a stream of [`FileScanTask`]s.
     pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
-        let context = FileScanStreamContext::new(
-            self.schema.clone(),
-            self.snapshot.clone(),
-            self.table_metadata.clone(),
-            self.file_io.clone(),
-            self.filter.clone(),
-            self.case_sensitive,
-        )?;
-
-        let mut partition_filter_cache = PartitionFilterCache::new();
-        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
-        let mut expression_evaluator_cache = ExpressionEvaluatorCache::new();
-
-        let field_ids = self.field_ids.clone();
-        let bound_predicates = self.bound_predicates.clone();
-
-        Ok(try_stream! {
-            let manifest_list = context
-                .snapshot
-                .load_manifest_list(&context.file_io, &context.table_metadata)
-                .await?;
-
-            for entry in manifest_list.entries() {
-                if !Self::content_type_is_data(entry) {
-                    continue;
-                }
-
-                let partition_spec_id = entry.partition_spec_id;
-
-                let partition_filter = partition_filter_cache.get(
-                    partition_spec_id,
-                    &context,
-                )?;
-
-                if let Some(partition_filter) = partition_filter {
-                    let manifest_evaluator = manifest_evaluator_cache.get(
-                        partition_spec_id,
-                        partition_filter,
-                    );
-
-                    if !manifest_evaluator.eval(entry)? {
-                        continue;
-                    }
-                }
+        // used to stream ManifestEntryContexts between stages of the file 
plan operation
+        let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) =
+            channel(CONCURRENCY_LIMIT_MANIFEST_FILES);

Review Comment:
   Absolutely, I'd considered doing that, I think it's a good idea. Will add



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to