liurenjie1024 commented on code in PR #373:
URL: https://github.com/apache/iceberg-rust/pull/373#discussion_r1690704340


##########
crates/iceberg/src/scan.rs:
##########
@@ -197,134 +193,103 @@ impl<'a> TableScanBuilder<'a> {
             field_ids.push(field_id);
         }
 
-        Ok(TableScan {
+        let snapshot_bound_predicate = if let Some(ref predicates) = 
self.filter {
+            Some(predicates.bind(schema.clone(), true)?)
+        } else {
+            None
+        };
+
+        let plan_context = PlanContext {
             snapshot,
-            file_io: self.table.file_io().clone(),
             table_metadata: self.table.metadata_ref(),
-            column_names: self.column_names,
-            field_ids,
-            bound_predicates,
-            schema,
-            batch_size: self.batch_size,
+            snapshot_schema: schema,
             case_sensitive: self.case_sensitive,
-            filter: self.filter.map(Arc::new),
+            predicate: self.filter.map(Arc::new),
+            snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
+            file_io: self.table.file_io().clone(),
+            field_ids: Arc::new(field_ids),
+            partition_filter_cache: Arc::new(PartitionFilterCache::new()),
+            manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
+            expression_evaluator_cache: 
Arc::new(ExpressionEvaluatorCache::new()),
+        };
+
+        Ok(TableScan {
+            batch_size: self.batch_size,
+            column_names: self.column_names,
+            file_io: self.table.file_io().clone(),
+            plan_context,
         })
     }
 }
 
 /// Table scan.
 #[derive(Debug)]
 pub struct TableScan {
-    snapshot: SnapshotRef,
-    table_metadata: TableMetadataRef,
+    plan_context: PlanContext,
+    batch_size: Option<usize>,
     file_io: FileIO,
     column_names: Vec<String>,
-    field_ids: Vec<i32>,
-    bound_predicates: Option<BoundPredicate>,
-    schema: SchemaRef,
-    batch_size: Option<usize>,
+}
+
+/// PlanContext wraps a [`SnapshotRef`] alongside all the other
+/// objects that are required to perform a scan file plan.
+#[derive(Debug)]
+struct PlanContext {
+    snapshot: SnapshotRef,
+
+    table_metadata: TableMetadataRef,
+    snapshot_schema: SchemaRef,
     case_sensitive: bool,
-    filter: Option<Arc<Predicate>>,
+    predicate: Option<Arc<Predicate>>,
+    snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
+    file_io: FileIO,
+    field_ids: Arc<Vec<i32>>,
+
+    partition_filter_cache: Arc<PartitionFilterCache>,
+    manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
+    expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
 }
 
 impl TableScan {
     /// Returns a stream of [`FileScanTask`]s.
     pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
-        let context = FileScanStreamContext::new(
-            self.schema.clone(),
-            self.snapshot.clone(),
-            self.table_metadata.clone(),
-            self.file_io.clone(),
-            self.filter.clone(),
-            self.case_sensitive,
-        )?;
-
-        let mut partition_filter_cache = PartitionFilterCache::new();
-        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
-        let mut expression_evaluator_cache = ExpressionEvaluatorCache::new();
-
-        let field_ids = self.field_ids.clone();
-        let bound_predicates = self.bound_predicates.clone();
-
-        Ok(try_stream! {
-            let manifest_list = context
-                .snapshot
-                .load_manifest_list(&context.file_io, &context.table_metadata)
-                .await?;
-
-            for entry in manifest_list.entries() {
-                if !Self::content_type_is_data(entry) {
-                    continue;
-                }
-
-                let partition_spec_id = entry.partition_spec_id;
-
-                let partition_filter = partition_filter_cache.get(
-                    partition_spec_id,
-                    &context,
-                )?;
-
-                if let Some(partition_filter) = partition_filter {
-                    let manifest_evaluator = manifest_evaluator_cache.get(
-                        partition_spec_id,
-                        partition_filter,
-                    );
-
-                    if !manifest_evaluator.eval(entry)? {
-                        continue;
-                    }
-                }
+        // used to stream ManifestEntryContexts between stages of the file 
plan operation
+        let (manifest_entry_ctx_tx, manifest_entry_ctx_rx) =
+            channel(CONCURRENCY_LIMIT_MANIFEST_FILES);
+        // used to stream the results back to the caller
+        let (file_scan_task_tx, file_scan_task_rx) = 
channel(CONCURRENCY_LIMIT_MANIFEST_ENTRIES);
+
+        let manifest_list = self.plan_context.get_manifest_list().await?;
+
+        // get the [`ManifestFile`]s from the [`ManifestList`], filtering out 
any
+        // whose content type is not Data or whose partitions cannot match this
+        // scan's filter
+        let manifest_file_contexts = self
+            .plan_context
+            .build_manifest_file_contexts(manifest_list, 
manifest_entry_ctx_tx)?;
+
+        // Concurrently load all [`Manifest`]s and stream their 
[`ManifestEntry`]s
+        futures::stream::iter(manifest_file_contexts)
+            .try_for_each_concurrent(CONCURRENCY_LIMIT_MANIFEST_FILES, |ctx| 
async move {
+                ctx.fetch_manifest_and_stream_manifest_entries().await
+            })
+            .await?;
+
+        // Process the [`ManifestEntry`] stream in parallel
+        manifest_entry_ctx_rx
+            .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
+            .try_for_each_concurrent(
+                CONCURRENCY_LIMIT_MANIFEST_ENTRIES,
+                |(manifest_entry_context, tx)| async move {
+                    crate::runtime::spawn(async move {
+                        Self::process_manifest_entry(manifest_entry_context, 
tx).await

Review Comment:
   I prefer to have concurrent as default and configurable, in fact, @Xuanwo as 
a database developer is an advanced user to me. 
   
   > In terms of having non-concurrent as the default option, my preference 
would be for the library to be able to choose the optimally performant 
(probably the number of cores?) as the default rather than no concurrncy.
   
   This sound an interesting idea to me since the actual parallelism is 
determined by number of threads of async runtime the user uses. Given user 
always has the option to change it, I think both solutions: a default value or 
number of cores are good to me.
   
   > Regarding the large number of tasks - there are a large number generated 
over the duration of the plan, but only a fixed small number that are in 
existence at one time.
   
   Sounds reasonable to me.
   
   > If I make the concurrncy limits configurable in the builder as 
@liurenjie1024 suggested above, then we could have a with_no_concurrency method 
on the builder. 
   
   If we allow user to set concurrency limits, user just need to set it to 1 to 
execute it without concurrency?  Having another `with_no_concurrency` somehow 
seems counterintuitive to me. Or do you mean to maintain another code path 
without concurrency? If so, I think we should avoid that given it introduces 
extra maintain effort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to