marvinlanhenke commented on code in PR #241:
URL: https://github.com/apache/iceberg-rust/pull/241#discussion_r1547117184


##########
crates/iceberg/src/scan.rs:
##########
@@ -214,326 +272,559 @@ impl FileScanTask {
     }
 }
 
-#[cfg(test)]
-mod tests {
-    use crate::io::{FileIO, OutputFile};
-    use crate::spec::{
-        DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, 
Literal, Manifest,
-        ManifestContentType, ManifestEntry, ManifestListWriter, 
ManifestMetadata, ManifestStatus,
-        ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
-    };
-    use crate::table::Table;
-    use crate::TableIdent;
-    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
-    use futures::TryStreamExt;
-    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
-    use parquet::basic::Compression;
-    use parquet::file::properties::WriterProperties;
-    use std::collections::HashMap;
-    use std::fs;
-    use std::fs::File;
-    use std::sync::Arc;
-    use tempfile::TempDir;
-    use tera::{Context, Tera};
-    use uuid::Uuid;
-
-    struct TableTestFixture {
-        table_location: String,
-        table: Table,
-    }
-
-    impl TableTestFixture {
-        fn new() -> Self {
-            let tmp_dir = TempDir::new().unwrap();
-            let table_location = tmp_dir.path().join("table1");
-            let manifest_list1_location = 
table_location.join("metadata/manifests_list_1.avro");
-            let manifest_list2_location = 
table_location.join("metadata/manifests_list_2.avro");
-            let table_metadata1_location = 
table_location.join("metadata/v1.json");
-
-            let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
-                .unwrap()
-                .build()
-                .unwrap();
-
-            let table_metadata = {
-                let template_json_str = fs::read_to_string(format!(
-                    "{}/testdata/example_table_metadata_v2.json",
-                    env!("CARGO_MANIFEST_DIR")
-                ))
-                .unwrap();
-                let mut context = Context::new();
-                context.insert("table_location", &table_location);
-                context.insert("manifest_list_1_location", 
&manifest_list1_location);
-                context.insert("manifest_list_2_location", 
&manifest_list2_location);
-                context.insert("table_metadata_1_location", 
&table_metadata1_location);
-
-                let metadata_json = Tera::one_off(&template_json_str, 
&context, false).unwrap();
-                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
-            };
-
-            let table = Table::builder()
-                .metadata(table_metadata)
-                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
-                .file_io(file_io)
-                
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
-                .build();
-
-            Self {
-                table_location: table_location.to_str().unwrap().to_string(),
-                table,
-            }
-        }
-
-        fn next_manifest_file(&self) -> OutputFile {
-            self.table
-                .file_io()
-                .new_output(format!(
-                    "{}/metadata/manifest_{}.avro",
-                    self.table_location,
-                    Uuid::new_v4()
-                ))
-                .unwrap()
-        }
-
-        async fn setup_manifest_files(&mut self) {
-            let current_snapshot = 
self.table.metadata().current_snapshot().unwrap();
-            let parent_snapshot = current_snapshot
-                .parent_snapshot(self.table.metadata())
-                .unwrap();
-            let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
-            let current_partition_spec = 
self.table.metadata().default_partition_spec().unwrap();
-
-            // Write data files
-            let data_file_manifest = ManifestWriter::new(
-                self.next_manifest_file(),
-                current_snapshot.snapshot_id(),
-                vec![],
-            )
-            .write(Manifest::new(
-                ManifestMetadata::builder()
-                    .schema((*current_schema).clone())
-                    .content(ManifestContentType::Data)
-                    .format_version(FormatVersion::V2)
-                    .partition_spec((**current_partition_spec).clone())
-                    .schema_id(current_schema.schema_id())
-                    .build(),
-                vec![
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Added)
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/1.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(100))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Deleted)
-                        .snapshot_id(parent_snapshot.snapshot_id())
-                        .sequence_number(parent_snapshot.sequence_number())
-                        
.file_sequence_number(parent_snapshot.sequence_number())
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/2.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(200))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Existing)
-                        .snapshot_id(parent_snapshot.snapshot_id())
-                        .sequence_number(parent_snapshot.sequence_number())
-                        
.file_sequence_number(parent_snapshot.sequence_number())
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/3.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(300))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                ],
-            ))
-            .await
-            .unwrap();
+/// Evaluates manifest files to see if their partition values comply with a 
filter predicate
+pub struct PartitionEvaluator {
+    manifest_eval_visitor: ManifestEvalVisitor,
+}
 
-            // Write to manifest list
-            let mut manifest_list_write = ManifestListWriter::v2(
-                self.table
-                    .file_io()
-                    .new_output(current_snapshot.manifest_list())
-                    .unwrap(),
-                current_snapshot.snapshot_id(),
-                current_snapshot
-                    .parent_snapshot_id()
-                    .unwrap_or(EMPTY_SNAPSHOT_ID),
-                current_snapshot.sequence_number(),
-            );
-            manifest_list_write
-                .add_manifests(vec![data_file_manifest].into_iter())
-                .unwrap();
-            manifest_list_write.close().await.unwrap();
-
-            // prepare data
-            let schema = {
-                let fields =
-                    vec![
-                        arrow_schema::Field::new("col", 
arrow_schema::DataType::Int64, true)
-                            .with_metadata(HashMap::from([(
-                                PARQUET_FIELD_ID_META_KEY.to_string(),
-                                "0".to_string(),
-                            )])),
-                    ];
-                Arc::new(arrow_schema::Schema::new(fields))
-            };
-            let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as 
ArrayRef;
-            let to_write = RecordBatch::try_new(schema.clone(), 
vec![col]).unwrap();
-
-            // Write the Parquet files
-            let props = WriterProperties::builder()
-                .set_compression(Compression::SNAPPY)
-                .build();
-
-            for n in 1..=3 {
-                let file = File::create(format!("{}/{}.parquet", 
&self.table_location, n)).unwrap();
-                let mut writer =
-                    ArrowWriter::try_new(file, to_write.schema(), 
Some(props.clone())).unwrap();
-
-                writer.write(&to_write).expect("Writing batch");
-
-                // writer must be closed to write footer
-                writer.close().unwrap();
-            }
-        }
+impl PartitionEvaluator {
+    pub(crate) fn new(
+        partition_spec: PartitionSpecRef,
+        partition_filter: BoundPredicate,
+        table_schema: SchemaRef,
+    ) -> crate::Result<Self> {
+        let manifest_eval_visitor = ManifestEvalVisitor::manifest_evaluator(
+            partition_spec,
+            table_schema,
+            partition_filter,
+            true,
+        )?;
+
+        Ok(PartitionEvaluator {
+            manifest_eval_visitor,
+        })
     }
 
-    #[test]
-    fn test_table_scan_columns() {
-        let table = TableTestFixture::new().table;
-
-        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
-        assert_eq!(vec!["x", "y"], table_scan.column_names);
-
-        let table_scan = table
-            .scan()
-            .select(["x", "y"])
-            .select(["z"])
-            .build()
-            .unwrap();
-        assert_eq!(vec!["z"], table_scan.column_names);
+    pub(crate) fn filter_manifest_file(&self, _manifest_file: &ManifestFile) 
-> bool {
+        self.manifest_eval_visitor.eval(_manifest_file)
     }
+}
 
-    #[test]
-    fn test_select_all() {
-        let table = TableTestFixture::new().table;
+struct ManifestEvalVisitor {
+    #[allow(dead_code)]
+    partition_schema: SchemaRef,
+    partition_filter: BoundPredicate,
+    #[allow(dead_code)]
+    case_sensitive: bool,
+}
 
-        let table_scan = table.scan().select_all().build().unwrap();
-        assert!(table_scan.column_names.is_empty());
+impl ManifestEvalVisitor {
+    fn new(
+        partition_schema: SchemaRef,
+        partition_filter: Predicate,
+        case_sensitive: bool,
+    ) -> crate::Result<Self> {
+        let partition_filter = partition_filter.bind(partition_schema.clone(), 
case_sensitive)?;

Review Comment:
   I think we have `rewrite_not` already in place to be applied to 
partition_filter?



##########
crates/iceberg/src/scan.rs:
##########
@@ -155,8 +189,22 @@ impl TableScan {
             .await?;
 
             // Generate data file stream
-            let mut entries = iter(manifest_list.entries());
-            while let Some(entry) = entries.next().await {
+            for entry in manifest_list.entries() {
+                // If this scan has a filter, check the partition evaluator 
cache for an existing
+                // PartitionEvaluator that matches this manifest's partition 
spec ID.
+                // Use one from the cache if there is one. If not, create one, 
put it in
+                // the cache, and take a reference to it.
+                if let Some(filter) = self.filter.as_ref() {
+                    let partition_evaluator = partition_evaluator_cache
+                            .entry(entry.partition_spec_id())
+                            .or_insert_with_key(|key| 
self.create_partition_evaluator(key, filter));
+
+                    // reject any manifest files whose partition values don't 
match the filter.
+                    if !partition_evaluator.filter_manifest_file(entry) {

Review Comment:
   I think we should apply `ManifestEvaluator` independently from 
`PartitionEvaluator` (see comment down below in struct PartitionEvaluator).
   
   Also, I'm not sure if we can simply 'skip' the file with continue. In the 
python impl the Partition- and MetricsEvaluator are applied on each DataFile on 
multiple threads - so I believe we have to collect all relevant manifest files 
before and then apply the Partition- and MetricsEvaluator on the complete 
collections (or split the work into multiple threads)? 



##########
crates/iceberg/src/scan.rs:
##########
@@ -214,326 +272,559 @@ impl FileScanTask {
     }
 }
 
-#[cfg(test)]
-mod tests {
-    use crate::io::{FileIO, OutputFile};
-    use crate::spec::{
-        DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, 
Literal, Manifest,
-        ManifestContentType, ManifestEntry, ManifestListWriter, 
ManifestMetadata, ManifestStatus,
-        ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
-    };
-    use crate::table::Table;
-    use crate::TableIdent;
-    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
-    use futures::TryStreamExt;
-    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
-    use parquet::basic::Compression;
-    use parquet::file::properties::WriterProperties;
-    use std::collections::HashMap;
-    use std::fs;
-    use std::fs::File;
-    use std::sync::Arc;
-    use tempfile::TempDir;
-    use tera::{Context, Tera};
-    use uuid::Uuid;
-
-    struct TableTestFixture {
-        table_location: String,
-        table: Table,
-    }
-
-    impl TableTestFixture {
-        fn new() -> Self {
-            let tmp_dir = TempDir::new().unwrap();
-            let table_location = tmp_dir.path().join("table1");
-            let manifest_list1_location = 
table_location.join("metadata/manifests_list_1.avro");
-            let manifest_list2_location = 
table_location.join("metadata/manifests_list_2.avro");
-            let table_metadata1_location = 
table_location.join("metadata/v1.json");
-
-            let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
-                .unwrap()
-                .build()
-                .unwrap();
-
-            let table_metadata = {
-                let template_json_str = fs::read_to_string(format!(
-                    "{}/testdata/example_table_metadata_v2.json",
-                    env!("CARGO_MANIFEST_DIR")
-                ))
-                .unwrap();
-                let mut context = Context::new();
-                context.insert("table_location", &table_location);
-                context.insert("manifest_list_1_location", 
&manifest_list1_location);
-                context.insert("manifest_list_2_location", 
&manifest_list2_location);
-                context.insert("table_metadata_1_location", 
&table_metadata1_location);
-
-                let metadata_json = Tera::one_off(&template_json_str, 
&context, false).unwrap();
-                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
-            };
-
-            let table = Table::builder()
-                .metadata(table_metadata)
-                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
-                .file_io(file_io)
-                
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
-                .build();
-
-            Self {
-                table_location: table_location.to_str().unwrap().to_string(),
-                table,
-            }
-        }
-
-        fn next_manifest_file(&self) -> OutputFile {
-            self.table
-                .file_io()
-                .new_output(format!(
-                    "{}/metadata/manifest_{}.avro",
-                    self.table_location,
-                    Uuid::new_v4()
-                ))
-                .unwrap()
-        }
-
-        async fn setup_manifest_files(&mut self) {
-            let current_snapshot = 
self.table.metadata().current_snapshot().unwrap();
-            let parent_snapshot = current_snapshot
-                .parent_snapshot(self.table.metadata())
-                .unwrap();
-            let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
-            let current_partition_spec = 
self.table.metadata().default_partition_spec().unwrap();
-
-            // Write data files
-            let data_file_manifest = ManifestWriter::new(
-                self.next_manifest_file(),
-                current_snapshot.snapshot_id(),
-                vec![],
-            )
-            .write(Manifest::new(
-                ManifestMetadata::builder()
-                    .schema((*current_schema).clone())
-                    .content(ManifestContentType::Data)
-                    .format_version(FormatVersion::V2)
-                    .partition_spec((**current_partition_spec).clone())
-                    .schema_id(current_schema.schema_id())
-                    .build(),
-                vec![
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Added)
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/1.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(100))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Deleted)
-                        .snapshot_id(parent_snapshot.snapshot_id())
-                        .sequence_number(parent_snapshot.sequence_number())
-                        
.file_sequence_number(parent_snapshot.sequence_number())
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/2.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(200))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Existing)
-                        .snapshot_id(parent_snapshot.snapshot_id())
-                        .sequence_number(parent_snapshot.sequence_number())
-                        
.file_sequence_number(parent_snapshot.sequence_number())
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/3.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(300))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                ],
-            ))
-            .await
-            .unwrap();
+/// Evaluates manifest files to see if their partition values comply with a 
filter predicate
+pub struct PartitionEvaluator {
+    manifest_eval_visitor: ManifestEvalVisitor,

Review Comment:
   I don't think we should nest `ManifestEvalVisitor` inside 
`PartitionEvaluator` since both structs have separate responsibilities and 
could exists independently (I shouldn't need a ManifestEvalVisitor to do 
partition pruning).
   
   Also I think the implementation of `PartitionEvaluator` is missing 
[[reference](https://github.com/apache/iceberg-python/blob/main/pyiceberg/expressions/visitors.py#L468)]?
 
   It should visit each `DataFile`s struct and evaluate the predicate.



##########
crates/iceberg/src/scan.rs:
##########
@@ -214,326 +272,559 @@ impl FileScanTask {
     }
 }
 
-#[cfg(test)]
-mod tests {
-    use crate::io::{FileIO, OutputFile};
-    use crate::spec::{
-        DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, 
Literal, Manifest,
-        ManifestContentType, ManifestEntry, ManifestListWriter, 
ManifestMetadata, ManifestStatus,
-        ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
-    };
-    use crate::table::Table;
-    use crate::TableIdent;
-    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
-    use futures::TryStreamExt;
-    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
-    use parquet::basic::Compression;
-    use parquet::file::properties::WriterProperties;
-    use std::collections::HashMap;
-    use std::fs;
-    use std::fs::File;
-    use std::sync::Arc;
-    use tempfile::TempDir;
-    use tera::{Context, Tera};
-    use uuid::Uuid;
-
-    struct TableTestFixture {
-        table_location: String,
-        table: Table,
-    }
-
-    impl TableTestFixture {
-        fn new() -> Self {
-            let tmp_dir = TempDir::new().unwrap();
-            let table_location = tmp_dir.path().join("table1");
-            let manifest_list1_location = 
table_location.join("metadata/manifests_list_1.avro");
-            let manifest_list2_location = 
table_location.join("metadata/manifests_list_2.avro");
-            let table_metadata1_location = 
table_location.join("metadata/v1.json");
-
-            let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
-                .unwrap()
-                .build()
-                .unwrap();
-
-            let table_metadata = {
-                let template_json_str = fs::read_to_string(format!(
-                    "{}/testdata/example_table_metadata_v2.json",
-                    env!("CARGO_MANIFEST_DIR")
-                ))
-                .unwrap();
-                let mut context = Context::new();
-                context.insert("table_location", &table_location);
-                context.insert("manifest_list_1_location", 
&manifest_list1_location);
-                context.insert("manifest_list_2_location", 
&manifest_list2_location);
-                context.insert("table_metadata_1_location", 
&table_metadata1_location);
-
-                let metadata_json = Tera::one_off(&template_json_str, 
&context, false).unwrap();
-                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
-            };
-
-            let table = Table::builder()
-                .metadata(table_metadata)
-                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
-                .file_io(file_io)
-                
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
-                .build();
-
-            Self {
-                table_location: table_location.to_str().unwrap().to_string(),
-                table,
-            }
-        }
-
-        fn next_manifest_file(&self) -> OutputFile {
-            self.table
-                .file_io()
-                .new_output(format!(
-                    "{}/metadata/manifest_{}.avro",
-                    self.table_location,
-                    Uuid::new_v4()
-                ))
-                .unwrap()
-        }
-
-        async fn setup_manifest_files(&mut self) {
-            let current_snapshot = 
self.table.metadata().current_snapshot().unwrap();
-            let parent_snapshot = current_snapshot
-                .parent_snapshot(self.table.metadata())
-                .unwrap();
-            let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
-            let current_partition_spec = 
self.table.metadata().default_partition_spec().unwrap();
-
-            // Write data files
-            let data_file_manifest = ManifestWriter::new(
-                self.next_manifest_file(),
-                current_snapshot.snapshot_id(),
-                vec![],
-            )
-            .write(Manifest::new(
-                ManifestMetadata::builder()
-                    .schema((*current_schema).clone())
-                    .content(ManifestContentType::Data)
-                    .format_version(FormatVersion::V2)
-                    .partition_spec((**current_partition_spec).clone())
-                    .schema_id(current_schema.schema_id())
-                    .build(),
-                vec![
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Added)
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/1.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(100))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Deleted)
-                        .snapshot_id(parent_snapshot.snapshot_id())
-                        .sequence_number(parent_snapshot.sequence_number())
-                        
.file_sequence_number(parent_snapshot.sequence_number())
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/2.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(200))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                    ManifestEntry::builder()
-                        .status(ManifestStatus::Existing)
-                        .snapshot_id(parent_snapshot.snapshot_id())
-                        .sequence_number(parent_snapshot.sequence_number())
-                        
.file_sequence_number(parent_snapshot.sequence_number())
-                        .data_file(
-                            DataFileBuilder::default()
-                                .content(DataContentType::Data)
-                                .file_path(format!("{}/3.parquet", 
&self.table_location))
-                                .file_format(DataFileFormat::Parquet)
-                                .file_size_in_bytes(100)
-                                .record_count(1)
-                                
.partition(Struct::from_iter([Some(Literal::long(300))]))
-                                .build()
-                                .unwrap(),
-                        )
-                        .build(),
-                ],
-            ))
-            .await
-            .unwrap();
+/// Evaluates manifest files to see if their partition values comply with a 
filter predicate
+pub struct PartitionEvaluator {
+    manifest_eval_visitor: ManifestEvalVisitor,
+}
 
-            // Write to manifest list
-            let mut manifest_list_write = ManifestListWriter::v2(
-                self.table
-                    .file_io()
-                    .new_output(current_snapshot.manifest_list())
-                    .unwrap(),
-                current_snapshot.snapshot_id(),
-                current_snapshot
-                    .parent_snapshot_id()
-                    .unwrap_or(EMPTY_SNAPSHOT_ID),
-                current_snapshot.sequence_number(),
-            );
-            manifest_list_write
-                .add_manifests(vec![data_file_manifest].into_iter())
-                .unwrap();
-            manifest_list_write.close().await.unwrap();
-
-            // prepare data
-            let schema = {
-                let fields =
-                    vec![
-                        arrow_schema::Field::new("col", 
arrow_schema::DataType::Int64, true)
-                            .with_metadata(HashMap::from([(
-                                PARQUET_FIELD_ID_META_KEY.to_string(),
-                                "0".to_string(),
-                            )])),
-                    ];
-                Arc::new(arrow_schema::Schema::new(fields))
-            };
-            let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as 
ArrayRef;
-            let to_write = RecordBatch::try_new(schema.clone(), 
vec![col]).unwrap();
-
-            // Write the Parquet files
-            let props = WriterProperties::builder()
-                .set_compression(Compression::SNAPPY)
-                .build();
-
-            for n in 1..=3 {
-                let file = File::create(format!("{}/{}.parquet", 
&self.table_location, n)).unwrap();
-                let mut writer =
-                    ArrowWriter::try_new(file, to_write.schema(), 
Some(props.clone())).unwrap();
-
-                writer.write(&to_write).expect("Writing batch");
-
-                // writer must be closed to write footer
-                writer.close().unwrap();
-            }
-        }
+impl PartitionEvaluator {
+    pub(crate) fn new(
+        partition_spec: PartitionSpecRef,
+        partition_filter: BoundPredicate,
+        table_schema: SchemaRef,
+    ) -> crate::Result<Self> {
+        let manifest_eval_visitor = ManifestEvalVisitor::manifest_evaluator(
+            partition_spec,
+            table_schema,
+            partition_filter,
+            true,
+        )?;
+
+        Ok(PartitionEvaluator {
+            manifest_eval_visitor,
+        })
     }
 
-    #[test]
-    fn test_table_scan_columns() {
-        let table = TableTestFixture::new().table;
-
-        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
-        assert_eq!(vec!["x", "y"], table_scan.column_names);
-
-        let table_scan = table
-            .scan()
-            .select(["x", "y"])
-            .select(["z"])
-            .build()
-            .unwrap();
-        assert_eq!(vec!["z"], table_scan.column_names);
+    pub(crate) fn filter_manifest_file(&self, _manifest_file: &ManifestFile) 
-> bool {
+        self.manifest_eval_visitor.eval(_manifest_file)
     }
+}
 
-    #[test]
-    fn test_select_all() {
-        let table = TableTestFixture::new().table;
+struct ManifestEvalVisitor {
+    #[allow(dead_code)]
+    partition_schema: SchemaRef,
+    partition_filter: BoundPredicate,
+    #[allow(dead_code)]
+    case_sensitive: bool,
+}
 
-        let table_scan = table.scan().select_all().build().unwrap();
-        assert!(table_scan.column_names.is_empty());
+impl ManifestEvalVisitor {
+    fn new(
+        partition_schema: SchemaRef,
+        partition_filter: Predicate,
+        case_sensitive: bool,
+    ) -> crate::Result<Self> {
+        let partition_filter = partition_filter.bind(partition_schema.clone(), 
case_sensitive)?;
+
+        Ok(Self {
+            partition_schema,
+            partition_filter,
+            case_sensitive,
+        })
     }
 
-    #[test]
-    fn test_select_no_exist_column() {
-        let table = TableTestFixture::new().table;
-
-        let table_scan = table.scan().select(["x", "y", "z", "a"]).build();
-        assert!(table_scan.is_err());
+    pub(crate) fn manifest_evaluator(

Review Comment:
   Perhaps we can combine this with `fn new` and get rid of `fn 
manifest_evaluator`. Although it aligns with the other implementations, it 
feels kinda strange - and I think using `ManifestEvalVisitor::new()` and then 
`.project()` is better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to