marvinlanhenke commented on code in PR #373:
URL: https://github.com/apache/iceberg-rust/pull/373#discussion_r1598053461


##########
crates/iceberg/src/scan.rs:
##########
@@ -189,66 +195,20 @@ impl TableScan {
             self.case_sensitive,
         )?;
 
-        let mut partition_filter_cache = PartitionFilterCache::new();
-        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
-
-        Ok(try_stream! {
-            let manifest_list = context
-                .snapshot
-                .load_manifest_list(&context.file_io, &context.table_metadata)
-                .await?;
-
-            for entry in manifest_list.entries() {
-                if !Self::content_type_is_data(entry) {
-                    continue;
-                }
+        let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);
 
-                let partition_spec_id = entry.partition_spec_id;
+        let manifest_list = context
+            .snapshot
+            .load_manifest_list(&context.file_io, &context.table_metadata)
+            .await?;
 
-                let partition_filter = partition_filter_cache.get(
-                    partition_spec_id,
-                    &context,
-                )?;
+        spawn(async move {

Review Comment:
   do we need to `spawn` here or is the `try_for_each_concurrent` in `run(...)` 
already enough?



##########
crates/iceberg/src/scan.rs:
##########
@@ -302,13 +262,147 @@ impl TableScan {
 
         arrow_reader_builder.build().read(self.plan_files().await?)
     }
+}
+
+#[derive(Debug)]
+struct ConcurrentFileScanStreamContext {
+    context: FileScanStreamContext,
+    sender: Sender<Result<FileScanTask>>,
+    manifest_evaluator_cache: ManifestEvaluatorCache,
+    partition_filter_cache: PartitionFilterCache,
+}
+
+impl ConcurrentFileScanStreamContext {
+    fn new(context: FileScanStreamContext, sender: 
Sender<Result<FileScanTask>>) -> Self {
+        ConcurrentFileScanStreamContext {
+            context,
+            sender,
+            manifest_evaluator_cache: ManifestEvaluatorCache::new(),
+            partition_filter_cache: PartitionFilterCache::new(),
+        }
+    }
+
+    async fn run(&mut self, manifest_list: ManifestList) -> Result<()> {
+        let file_io = self.context.file_io.clone();
+        let sender = self.sender.clone();
+
+        // This whole Vec-and-for-loop approach feels sub-optimally structured.
+        // I've tried structuring this in multiple ways but run into
+        // issues with ownership. Ideally I'd like to structure this
+        // with a functional programming approach: extracting
+        // sections 1, 2, and 3 out into different methods on Self,
+        // and then use some iterator combinators to orchestrate it all.
+        // Section 1 is pretty trivially refactorable into a static method
+        // that can be used in a closure that can be used with 
Iterator::filter.
+        // Similarly, section 3 seems easily factor-able into a method that can
+        // be passed into Iterator::map.
+        // Section 2 turns out trickier - we want to exit the entire `run` 
method early
+        // if the eval fails, and filter out any manifest_files from the 
iterator / stream
+        // if the eval succeeds but returns true. We bump into ownership 
issues due
+        // to needing to pass mut self as the caches need to be able to mutate.
+        // Section 3 runs into ownership issues when trying to refactor its 
closure to be
+        // a static or non-static method.
+
+        // 1
+        let filtered_manifest_files = manifest_list
+            .entries()
+            .iter()
+            .filter(Self::reject_unsupported_content_types);
+
+        // 2
+        let mut filtered_manifest_files2 = vec![];
+        for manifest_file in filtered_manifest_files {
+            if !self.apply_evaluator(manifest_file)? {
+                continue;
+            }
+
+            filtered_manifest_files2.push(manifest_file);
+        }
+
+        // 3
+        let filtered_manifest_files = filtered_manifest_files2
+            .into_iter()
+            .map(|manifest_file| Ok((manifest_file, file_io.clone(), 
sender.clone())));
+
+        futures::stream::iter(filtered_manifest_files)
+            .try_for_each_concurrent(
+                CONCURRENCY_LIMIT_MANIFEST_FILES,
+                Self::process_manifest_file,
+            )
+            .await
+    }
+
+    fn reject_unsupported_content_types(manifest_file: &&ManifestFile) -> bool 
{
+        SUPPORTED_MANIFEST_FILE_CONTENT_TYPES.contains(&manifest_file.content)
+    }
+
+    fn apply_evaluator(&mut self, manifest_file: &ManifestFile) -> 
Result<bool> {
+        let partition_spec_id = manifest_file.partition_spec_id;
+
+        let partition_filter = self
+            .partition_filter_cache
+            .get(partition_spec_id, &self.context)?;
+
+        if let Some(partition_filter) = partition_filter {
+            let manifest_evaluator = self
+                .manifest_evaluator_cache
+                .get(partition_spec_id, partition_filter);
+
+            if !manifest_evaluator.eval(manifest_file)? {
+                return Ok(false);
+            }
+        }
+
+        Ok(true)
+    }
+
+    async fn process_manifest_file(
+        manifest_and_file_io_and_sender: (&ManifestFile, FileIO, 
Sender<Result<FileScanTask>>),
+    ) -> Result<()> {
+        let (manifest_file, file_io, sender) = manifest_and_file_io_and_sender;
+
+        let manifest = manifest_file.load_manifest(&file_io).await?;
+
+        let manifest_entries = manifest
+            .entries()
+            .iter()
+            .filter(|x| x.is_alive())
+            .map(|manifest_entry| Ok((manifest_entry, sender.clone())));
+
+        futures::stream::iter(manifest_entries)
+            .try_for_each_concurrent(
+                CONCURRENCY_LIMIT_MANIFEST_ENTRIES,
+                Self::process_manifest_entry,
+            )
+            .await

Review Comment:
   we had a discussion on this in #124 (in case you missed it) to not go 
overboard with the task spawning



##########
crates/iceberg/src/scan.rs:
##########
@@ -302,13 +262,147 @@ impl TableScan {
 
         arrow_reader_builder.build().read(self.plan_files().await?)
     }
+}
+
+#[derive(Debug)]
+struct ConcurrentFileScanStreamContext {

Review Comment:
   I think once we have a runtime and the async approach is approved, we can 
get rid of the `FileScanStreamContext` and merge the struct into 
`ConcurrentFileScanStreamContext`



##########
crates/iceberg/src/scan.rs:
##########
@@ -302,13 +262,147 @@ impl TableScan {
 
         arrow_reader_builder.build().read(self.plan_files().await?)
     }
+}
+
+#[derive(Debug)]
+struct ConcurrentFileScanStreamContext {
+    context: FileScanStreamContext,
+    sender: Sender<Result<FileScanTask>>,
+    manifest_evaluator_cache: ManifestEvaluatorCache,
+    partition_filter_cache: PartitionFilterCache,
+}
+
+impl ConcurrentFileScanStreamContext {
+    fn new(context: FileScanStreamContext, sender: 
Sender<Result<FileScanTask>>) -> Self {
+        ConcurrentFileScanStreamContext {
+            context,
+            sender,
+            manifest_evaluator_cache: ManifestEvaluatorCache::new(),
+            partition_filter_cache: PartitionFilterCache::new(),
+        }
+    }
+
+    async fn run(&mut self, manifest_list: ManifestList) -> Result<()> {
+        let file_io = self.context.file_io.clone();
+        let sender = self.sender.clone();
+
+        // This whole Vec-and-for-loop approach feels sub-optimally structured.
+        // I've tried structuring this in multiple ways but run into
+        // issues with ownership. Ideally I'd like to structure this
+        // with a functional programming approach: extracting
+        // sections 1, 2, and 3 out into different methods on Self,
+        // and then use some iterator combinators to orchestrate it all.
+        // Section 1 is pretty trivially refactorable into a static method
+        // that can be used in a closure that can be used with 
Iterator::filter.
+        // Similarly, section 3 seems easily factor-able into a method that can
+        // be passed into Iterator::map.
+        // Section 2 turns out trickier - we want to exit the entire `run` 
method early
+        // if the eval fails, and filter out any manifest_files from the 
iterator / stream
+        // if the eval succeeds but returns true. We bump into ownership 
issues due
+        // to needing to pass mut self as the caches need to be able to mutate.
+        // Section 3 runs into ownership issues when trying to refactor its 
closure to be
+        // a static or non-static method.
+
+        // 1
+        let filtered_manifest_files = manifest_list
+            .entries()
+            .iter()
+            .filter(Self::reject_unsupported_content_types);
+
+        // 2
+        let mut filtered_manifest_files2 = vec![];
+        for manifest_file in filtered_manifest_files {
+            if !self.apply_evaluator(manifest_file)? {
+                continue;
+            }
+
+            filtered_manifest_files2.push(manifest_file);
+        }
+
+        // 3
+        let filtered_manifest_files = filtered_manifest_files2
+            .into_iter()
+            .map(|manifest_file| Ok((manifest_file, file_io.clone(), 
sender.clone())));
+
+        futures::stream::iter(filtered_manifest_files)
+            .try_for_each_concurrent(
+                CONCURRENCY_LIMIT_MANIFEST_FILES,
+                Self::process_manifest_file,
+            )
+            .await

Review Comment:
   will this yield the first FileScanTask when its available - or do we have to 
iterate over the complete stream, before we can return any result?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to