Fokko commented on code in PR #129:
URL: https://github.com/apache/iceberg-rust/pull/129#discussion_r1440607629


##########
crates/iceberg/src/spec/snapshot.rs:
##########
@@ -124,6 +150,70 @@ impl Snapshot {
         Utc.timestamp_millis_opt(self.timestamp_ms).unwrap()
     }
 
+    /// Get the schema id of this snapshot.
+    #[inline]
+    pub fn schema_id(&self) -> Option<SchemaId> {
+        self.schema_id
+    }
+
+    /// Get the schema of this snapshot.
+    pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
+        Ok(match self.schema_id() {
+            Some(schema_id) => table_metadata
+                .schema_by_id(schema_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Schema with id {} not found", schema_id),
+                    )
+                })?
+                .clone(),
+            None => table_metadata.current_schema().clone(),
+        })
+    }

Review Comment:
   The logic is a bit more complicated here. As an example:
   
   1. You create a new table with a schema
   2. You append some data
   3. You add another column
   
   If you would query the table, this will return the initial schema, which is 
a bit awkward. Until you write data again, the new column will not show up.
   
   In Java/Python the behavior is: If you query for a specific snapshot-id, 
you'll use that schema (if present). If you don't specify a snapshot, and 
you'll fall back to the current-snapshot-id, then you'll use the 
current-schema-id.



##########
crates/iceberg/src/scan.rs:
##########
@@ -0,0 +1,616 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table scan api.
+
+use crate::io::FileIO;
+use crate::spec::{
+    DataContentType, ManifestContentType, ManifestEntry, ManifestEntryRef, 
SchemaRef, SnapshotRef,
+    TableMetadataRef, INITIAL_SEQUENCE_NUMBER,
+};
+use crate::table::Table;
+use crate::{Error, ErrorKind};
+use arrow_array::RecordBatch;
+use futures::stream::{iter, BoxStream};
+use futures::StreamExt;
+
+/// Builder to create table scan.
+pub struct TableScanBuilder<'a> {
+    table: &'a Table,
+    // Empty column names means to select all columns
+    column_names: Vec<String>,
+    snapshot_id: Option<i64>,
+}
+
+impl<'a> TableScanBuilder<'a> {
+    pub fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            column_names: vec![],
+            snapshot_id: None,
+        }
+    }
+
+    /// Select all columns.
+    pub fn select_all(mut self) -> Self {
+        self.column_names.clear();
+        self
+    }
+
+    /// Select some columns of the table.
+    pub fn select(mut self, column_names: impl IntoIterator<Item = impl 
ToString>) -> Self {
+        self.column_names = column_names
+            .into_iter()
+            .map(|item| item.to_string())
+            .collect();
+        self
+    }
+
+    /// Set the snapshot to scan. When not set, it uses current snapshot.
+    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = Some(snapshot_id);
+        self
+    }
+
+    /// Build the table scan.
+    pub fn build(self) -> crate::Result<TableScan> {
+        let snapshot = match self.snapshot_id {
+            Some(snapshot_id) => self
+                .table
+                .metadata()
+                .snapshot_by_id(snapshot_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Snapshot with id {} not found", snapshot_id),
+                    )
+                })?
+                .clone(),
+            None => self
+                .table
+                .metadata()
+                .current_snapshot()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        "Can't scan table without snapshots",
+                    )
+                })?
+                .clone(),
+        };
+
+        let schema = snapshot.schema(self.table.metadata())?;
+
+        // Check that all column names exist in the schema.
+        if !self.column_names.is_empty() {
+            for column_name in &self.column_names {
+                if schema.field_by_name(column_name).is_none() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Column {} not found in table.", column_name),
+                    ));
+                }
+            }
+        }
+
+        Ok(TableScan {
+            snapshot,
+            file_io: self.table.file_io().clone(),
+            table_metadata: self.table.metadata_ref(),
+            column_names: self.column_names,
+            schema,
+        })
+    }
+}
+
+/// Table scan.
+#[derive(Debug)]
+#[allow(dead_code)]
+pub struct TableScan {
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    column_names: Vec<String>,
+    schema: SchemaRef,
+}
+
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
+
+impl TableScan {
+    /// Returns a stream of file scan tasks.
+    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
+        let manifest_list = self
+            .snapshot
+            .load_manifest_list(&self.file_io, &self.table_metadata)
+            .await?;
+
+        // Get minimum sequence number of data files.
+        let min_data_file_seq_num = manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+            .map(|e| e.min_sequence_number)
+            .min()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Collect deletion files first.
+        let mut position_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+        let mut eq_delete_files = 
Vec::with_capacity(manifest_list.entries().len());

Review Comment:
   I would leave out the deletes for the basic file scan planning. They come 
with a lot of implementation details, and they are not performant at all if you 
don't have statistics evaluation.



##########
crates/iceberg/Cargo.toml:
##########
@@ -62,4 +62,5 @@ uuid = { workspace = true }
 [dev-dependencies]
 pretty_assertions = { workspace = true }
 tempfile = { workspace = true }
+tera = { workspace = true }

Review Comment:
   With PyIceberg it will take a while for all the containers to boot on the 
first run, but after that the test are more or less instant :)



##########
crates/iceberg/src/spec/manifest_list.rs:
##########
@@ -628,6 +630,30 @@ impl TryFrom<i32> for ManifestContentType {
     }
 }
 
+impl ManifestListEntry {

Review Comment:
   I'm confused by the naming, should this be a `ManifestFile`?
   
   From the [spec](https://iceberg.apache.org/spec/#manifest-lists): 
   
   Manifest list files store `manifest_file`, a struct with the following fields
   
   



##########
crates/iceberg/src/spec/table_metadata.rs:
##########
@@ -38,6 +38,12 @@ static MAIN_BRANCH: &str = "main";
 static DEFAULT_SPEC_ID: i32 = 0;
 static DEFAULT_SORT_ORDER_ID: i64 = 0;
 
+pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;

Review Comment:
   Why use `-1` here and not just `None`?



##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -819,6 +849,49 @@ impl ManifestEntry {
             ManifestStatus::Added | ManifestStatus::Existing
         )
     }
+
+    /// Content type of this manifest entry.
+    pub fn content_type(&self) -> DataContentType {
+        self.data_file.content
+    }
+
+    /// Data file path of this manifest entry.
+    pub fn file_path(&self) -> &str {
+        &self.data_file.file_path
+    }
+
+    /// Inherit data from snapshot, such as snapshot id, sequence number.
+    pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestListEntry) {

Review Comment:
   I think the naming is confusing here. Inheritance is about inheriting from 
the manifest-list to the manifest-entry, not from the snapshot.



##########
crates/iceberg/src/scan.rs:
##########
@@ -0,0 +1,616 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table scan api.
+
+use crate::io::FileIO;
+use crate::spec::{
+    DataContentType, ManifestContentType, ManifestEntry, ManifestEntryRef, 
SchemaRef, SnapshotRef,
+    TableMetadataRef, INITIAL_SEQUENCE_NUMBER,
+};
+use crate::table::Table;
+use crate::{Error, ErrorKind};
+use arrow_array::RecordBatch;
+use futures::stream::{iter, BoxStream};
+use futures::StreamExt;
+
+/// Builder to create table scan.
+pub struct TableScanBuilder<'a> {
+    table: &'a Table,
+    // Empty column names means to select all columns
+    column_names: Vec<String>,
+    snapshot_id: Option<i64>,
+}
+
+impl<'a> TableScanBuilder<'a> {
+    pub fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            column_names: vec![],
+            snapshot_id: None,
+        }
+    }
+
+    /// Select all columns.
+    pub fn select_all(mut self) -> Self {
+        self.column_names.clear();
+        self
+    }
+
+    /// Select some columns of the table.
+    pub fn select(mut self, column_names: impl IntoIterator<Item = impl 
ToString>) -> Self {
+        self.column_names = column_names
+            .into_iter()
+            .map(|item| item.to_string())
+            .collect();
+        self
+    }
+
+    /// Set the snapshot to scan. When not set, it uses current snapshot.
+    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = Some(snapshot_id);
+        self
+    }
+
+    /// Build the table scan.
+    pub fn build(self) -> crate::Result<TableScan> {
+        let snapshot = match self.snapshot_id {
+            Some(snapshot_id) => self
+                .table
+                .metadata()
+                .snapshot_by_id(snapshot_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Snapshot with id {} not found", snapshot_id),
+                    )
+                })?
+                .clone(),
+            None => self
+                .table
+                .metadata()
+                .current_snapshot()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        "Can't scan table without snapshots",
+                    )
+                })?
+                .clone(),
+        };
+
+        let schema = snapshot.schema(self.table.metadata())?;
+
+        // Check that all column names exist in the schema.
+        if !self.column_names.is_empty() {
+            for column_name in &self.column_names {
+                if schema.field_by_name(column_name).is_none() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Column {} not found in table.", column_name),
+                    ));
+                }
+            }
+        }
+
+        Ok(TableScan {
+            snapshot,
+            file_io: self.table.file_io().clone(),
+            table_metadata: self.table.metadata_ref(),
+            column_names: self.column_names,
+            schema,
+        })
+    }
+}
+
+/// Table scan.
+#[derive(Debug)]
+#[allow(dead_code)]
+pub struct TableScan {
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    column_names: Vec<String>,
+    schema: SchemaRef,
+}
+
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
+
+impl TableScan {
+    /// Returns a stream of file scan tasks.
+    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
+        let manifest_list = self
+            .snapshot
+            .load_manifest_list(&self.file_io, &self.table_metadata)
+            .await?;
+
+        // Get minimum sequence number of data files.
+        let min_data_file_seq_num = manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+            .map(|e| e.min_sequence_number)
+            .min()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Collect deletion files first.
+        let mut position_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+        let mut eq_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+
+        // TODO: We should introduce runtime api to enable parallel scan.
+        for manifest_list_entry in manifest_list.entries().iter().filter(|e| {
+            e.content == ManifestContentType::Deletes && e.sequence_number >= 
min_data_file_seq_num
+        }) {
+            let manifest_file = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest_file.entries().iter().filter(|e| 
e.is_alive()) {
+                match manifest_entry.content_type() {
+                    DataContentType::PositionDeletes => {
+                        position_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::EqualityDeletes => {
+                        eq_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::Data => {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Data file entry({}) found in delete manifest 
file({})",
+                                manifest_entry.file_path(),
+                                manifest_list_entry.manifest_path
+                            ),
+                        ));
+                    }
+                }
+            }
+        }
+
+        // Sort delete files by sequence number.
+        position_delete_files
+            .sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+        eq_delete_files.sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+
+        // Generate data file stream
+        let mut file_scan_tasks = 
Vec::with_capacity(manifest_list.entries().len());
+        for manifest_list_entry in manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+        {

Review Comment:
   Out of curiousity, I love functional programming. Would it be possible to 
`flat_map` the manifest entries, and then filter on alive? :)



##########
crates/iceberg/src/spec/snapshot.rs:
##########
@@ -124,6 +150,70 @@ impl Snapshot {
         Utc.timestamp_millis_opt(self.timestamp_ms).unwrap()
     }
 
+    /// Get the schema id of this snapshot.
+    #[inline]
+    pub fn schema_id(&self) -> Option<SchemaId> {
+        self.schema_id
+    }
+
+    /// Get the schema of this snapshot.
+    pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
+        Ok(match self.schema_id() {
+            Some(schema_id) => table_metadata
+                .schema_by_id(schema_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Schema with id {} not found", schema_id),
+                    )
+                })?
+                .clone(),
+            None => table_metadata.current_schema().clone(),
+        })
+    }
+
+    /// Get parent snapshot.
+    #[cfg(test)]
+    pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> 
Option<SnapshotRef> {
+        match self.parent_snapshot_id {
+            Some(id) => table_metadata.snapshot_by_id(id).cloned(),
+            None => None,
+        }
+    }
+
+    /// Load manifest list.
+    pub async fn load_manifest_list(
+        &self,
+        file_io: &FileIO,
+        table_metadata: &TableMetadata,
+    ) -> Result<ManifestList> {
+        match &self.manifest_list {
+            ManifestListLocation::ManifestListFile(file) => {
+                let mut manifest_list_content= Vec::new();
+                file_io
+                    .new_input(file)?
+                    .reader().await?
+                    .read_to_end(&mut manifest_list_content)
+                    .await?;
+
+                let schema = self.schema(table_metadata)?;
+
+                let partition_type_provider = |partition_spec_id: i32| -> 
Result<Option<StructType>> {
+                    
table_metadata.partition_spec_by_id(partition_spec_id).map(|partition_spec| {
+                        partition_spec.partition_type(&schema)
+                    }).transpose()
+                };
+
+                ManifestList::parse_with_version(&manifest_list_content, 
table_metadata.format_version(),
+                                                    partition_type_provider, )
+            }
+            ManifestListLocation::ManifestFiles(_) => Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                "Loading manifests from `manifests` is currently not 
supported, we only support loading from `manifest-list` file, see 
https://iceberg.apache.org/spec/#snapshots for more information.",
+            )),

Review Comment:
   This is also going to be deprecated in Java with the 2.0 release



##########
crates/iceberg/src/scan.rs:
##########
@@ -0,0 +1,616 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table scan api.
+
+use crate::io::FileIO;
+use crate::spec::{
+    DataContentType, ManifestContentType, ManifestEntry, ManifestEntryRef, 
SchemaRef, SnapshotRef,
+    TableMetadataRef, INITIAL_SEQUENCE_NUMBER,
+};
+use crate::table::Table;
+use crate::{Error, ErrorKind};
+use arrow_array::RecordBatch;
+use futures::stream::{iter, BoxStream};
+use futures::StreamExt;
+
+/// Builder to create table scan.
+pub struct TableScanBuilder<'a> {
+    table: &'a Table,
+    // Empty column names means to select all columns
+    column_names: Vec<String>,
+    snapshot_id: Option<i64>,
+}
+
+impl<'a> TableScanBuilder<'a> {
+    pub fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            column_names: vec![],
+            snapshot_id: None,
+        }
+    }
+
+    /// Select all columns.
+    pub fn select_all(mut self) -> Self {
+        self.column_names.clear();
+        self
+    }
+
+    /// Select some columns of the table.
+    pub fn select(mut self, column_names: impl IntoIterator<Item = impl 
ToString>) -> Self {
+        self.column_names = column_names
+            .into_iter()
+            .map(|item| item.to_string())
+            .collect();
+        self
+    }
+
+    /// Set the snapshot to scan. When not set, it uses current snapshot.
+    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = Some(snapshot_id);
+        self
+    }
+
+    /// Build the table scan.
+    pub fn build(self) -> crate::Result<TableScan> {
+        let snapshot = match self.snapshot_id {
+            Some(snapshot_id) => self
+                .table
+                .metadata()
+                .snapshot_by_id(snapshot_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Snapshot with id {} not found", snapshot_id),
+                    )
+                })?
+                .clone(),
+            None => self
+                .table
+                .metadata()
+                .current_snapshot()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        "Can't scan table without snapshots",
+                    )
+                })?
+                .clone(),
+        };
+
+        let schema = snapshot.schema(self.table.metadata())?;
+
+        // Check that all column names exist in the schema.
+        if !self.column_names.is_empty() {
+            for column_name in &self.column_names {
+                if schema.field_by_name(column_name).is_none() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Column {} not found in table.", column_name),
+                    ));
+                }
+            }
+        }
+
+        Ok(TableScan {
+            snapshot,
+            file_io: self.table.file_io().clone(),
+            table_metadata: self.table.metadata_ref(),
+            column_names: self.column_names,
+            schema,
+        })
+    }
+}
+
+/// Table scan.
+#[derive(Debug)]
+#[allow(dead_code)]
+pub struct TableScan {
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    column_names: Vec<String>,
+    schema: SchemaRef,
+}
+
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
+
+impl TableScan {
+    /// Returns a stream of file scan tasks.
+    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
+        let manifest_list = self
+            .snapshot
+            .load_manifest_list(&self.file_io, &self.table_metadata)
+            .await?;
+
+        // Get minimum sequence number of data files.
+        let min_data_file_seq_num = manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+            .map(|e| e.min_sequence_number)
+            .min()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Collect deletion files first.
+        let mut position_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+        let mut eq_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+
+        // TODO: We should introduce runtime api to enable parallel scan.
+        for manifest_list_entry in manifest_list.entries().iter().filter(|e| {
+            e.content == ManifestContentType::Deletes && e.sequence_number >= 
min_data_file_seq_num
+        }) {
+            let manifest_file = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest_file.entries().iter().filter(|e| 
e.is_alive()) {
+                match manifest_entry.content_type() {
+                    DataContentType::PositionDeletes => {
+                        position_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::EqualityDeletes => {
+                        eq_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::Data => {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Data file entry({}) found in delete manifest 
file({})",
+                                manifest_entry.file_path(),
+                                manifest_list_entry.manifest_path
+                            ),
+                        ));
+                    }
+                }
+            }
+        }
+
+        // Sort delete files by sequence number.
+        position_delete_files
+            .sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+        eq_delete_files.sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+
+        // Generate data file stream
+        let mut file_scan_tasks = 
Vec::with_capacity(manifest_list.entries().len());
+        for manifest_list_entry in manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+        {
+            // Data file
+            let manifest = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest.entries() {
+                if manifest_entry.is_alive() {
+                    file_scan_tasks.push(Ok(FileScanTask {
+                        data_file: manifest_entry.clone(),
+                        position_delete_files: 
TableScan::filter_position_delete_files(
+                            manifest_entry,
+                            &position_delete_files,
+                        ),
+                        eq_delete_files: TableScan::filter_eq_delete_files(
+                            manifest_entry,
+                            &eq_delete_files,
+                        ),
+                        start: 0,
+                        length: manifest_entry.file_size_in_bytes(),
+                    }));
+                }
+            }
+        }
+
+        Ok(iter(file_scan_tasks).boxed())
+    }
+
+    /// Return the position delete files that should be applied to the data 
file.
+    ///
+    /// Here we assume that the position delete files are sorted by sequence 
number in ascending order.
+    fn filter_position_delete_files(
+        data_file: &ManifestEntry,
+        position_deletes: &[ManifestEntryRef],
+    ) -> Vec<ManifestEntryRef> {
+        let data_seq_num = data_file
+            .sequence_number()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);

Review Comment:
   Technically the sequence number cannot be undefined here. The sequence 
number should be inherited from the manifest-list, and for the manifest-list it 
should be set initial-sequence-number to 0.



##########
crates/iceberg/src/scan.rs:
##########
@@ -0,0 +1,616 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table scan api.
+
+use crate::io::FileIO;
+use crate::spec::{
+    DataContentType, ManifestContentType, ManifestEntry, ManifestEntryRef, 
SchemaRef, SnapshotRef,
+    TableMetadataRef, INITIAL_SEQUENCE_NUMBER,
+};
+use crate::table::Table;
+use crate::{Error, ErrorKind};
+use arrow_array::RecordBatch;
+use futures::stream::{iter, BoxStream};
+use futures::StreamExt;
+
+/// Builder to create table scan.
+pub struct TableScanBuilder<'a> {
+    table: &'a Table,
+    // Empty column names means to select all columns
+    column_names: Vec<String>,
+    snapshot_id: Option<i64>,
+}
+
+impl<'a> TableScanBuilder<'a> {
+    pub fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            column_names: vec![],
+            snapshot_id: None,
+        }
+    }
+
+    /// Select all columns.
+    pub fn select_all(mut self) -> Self {
+        self.column_names.clear();
+        self
+    }
+
+    /// Select some columns of the table.
+    pub fn select(mut self, column_names: impl IntoIterator<Item = impl 
ToString>) -> Self {
+        self.column_names = column_names
+            .into_iter()
+            .map(|item| item.to_string())
+            .collect();
+        self
+    }
+
+    /// Set the snapshot to scan. When not set, it uses current snapshot.
+    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = Some(snapshot_id);
+        self
+    }
+
+    /// Build the table scan.
+    pub fn build(self) -> crate::Result<TableScan> {
+        let snapshot = match self.snapshot_id {
+            Some(snapshot_id) => self
+                .table
+                .metadata()
+                .snapshot_by_id(snapshot_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Snapshot with id {} not found", snapshot_id),
+                    )
+                })?
+                .clone(),
+            None => self
+                .table
+                .metadata()
+                .current_snapshot()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        "Can't scan table without snapshots",
+                    )
+                })?
+                .clone(),
+        };
+
+        let schema = snapshot.schema(self.table.metadata())?;
+
+        // Check that all column names exist in the schema.
+        if !self.column_names.is_empty() {
+            for column_name in &self.column_names {
+                if schema.field_by_name(column_name).is_none() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Column {} not found in table.", column_name),
+                    ));
+                }
+            }
+        }
+
+        Ok(TableScan {
+            snapshot,
+            file_io: self.table.file_io().clone(),
+            table_metadata: self.table.metadata_ref(),
+            column_names: self.column_names,
+            schema,
+        })
+    }
+}
+
+/// Table scan.
+#[derive(Debug)]
+#[allow(dead_code)]
+pub struct TableScan {
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    column_names: Vec<String>,
+    schema: SchemaRef,
+}
+
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
+
+impl TableScan {
+    /// Returns a stream of file scan tasks.
+    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
+        let manifest_list = self
+            .snapshot
+            .load_manifest_list(&self.file_io, &self.table_metadata)
+            .await?;
+
+        // Get minimum sequence number of data files.
+        let min_data_file_seq_num = manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+            .map(|e| e.min_sequence_number)
+            .min()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Collect deletion files first.
+        let mut position_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+        let mut eq_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+
+        // TODO: We should introduce runtime api to enable parallel scan.
+        for manifest_list_entry in manifest_list.entries().iter().filter(|e| {
+            e.content == ManifestContentType::Deletes && e.sequence_number >= 
min_data_file_seq_num
+        }) {
+            let manifest_file = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest_file.entries().iter().filter(|e| 
e.is_alive()) {
+                match manifest_entry.content_type() {
+                    DataContentType::PositionDeletes => {
+                        position_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::EqualityDeletes => {
+                        eq_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::Data => {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Data file entry({}) found in delete manifest 
file({})",
+                                manifest_entry.file_path(),
+                                manifest_list_entry.manifest_path
+                            ),
+                        ));
+                    }
+                }
+            }
+        }
+
+        // Sort delete files by sequence number.
+        position_delete_files
+            .sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+        eq_delete_files.sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+
+        // Generate data file stream
+        let mut file_scan_tasks = 
Vec::with_capacity(manifest_list.entries().len());

Review Comment:
   This is the third time we call `.entries().len()` does it make sense in Rust 
to store this in a local variable?



##########
crates/iceberg/src/scan.rs:
##########
@@ -0,0 +1,616 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table scan api.
+
+use crate::io::FileIO;
+use crate::spec::{
+    DataContentType, ManifestContentType, ManifestEntry, ManifestEntryRef, 
SchemaRef, SnapshotRef,
+    TableMetadataRef, INITIAL_SEQUENCE_NUMBER,
+};
+use crate::table::Table;
+use crate::{Error, ErrorKind};
+use arrow_array::RecordBatch;
+use futures::stream::{iter, BoxStream};
+use futures::StreamExt;
+
+/// Builder to create table scan.
+pub struct TableScanBuilder<'a> {
+    table: &'a Table,
+    // Empty column names means to select all columns
+    column_names: Vec<String>,
+    snapshot_id: Option<i64>,
+}
+
+impl<'a> TableScanBuilder<'a> {
+    pub fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            column_names: vec![],
+            snapshot_id: None,
+        }
+    }
+
+    /// Select all columns.
+    pub fn select_all(mut self) -> Self {
+        self.column_names.clear();
+        self
+    }
+
+    /// Select some columns of the table.
+    pub fn select(mut self, column_names: impl IntoIterator<Item = impl 
ToString>) -> Self {
+        self.column_names = column_names
+            .into_iter()
+            .map(|item| item.to_string())
+            .collect();
+        self
+    }
+
+    /// Set the snapshot to scan. When not set, it uses current snapshot.
+    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = Some(snapshot_id);
+        self
+    }
+
+    /// Build the table scan.
+    pub fn build(self) -> crate::Result<TableScan> {
+        let snapshot = match self.snapshot_id {
+            Some(snapshot_id) => self
+                .table
+                .metadata()
+                .snapshot_by_id(snapshot_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Snapshot with id {} not found", snapshot_id),
+                    )
+                })?
+                .clone(),
+            None => self
+                .table
+                .metadata()
+                .current_snapshot()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        "Can't scan table without snapshots",
+                    )
+                })?
+                .clone(),
+        };
+
+        let schema = snapshot.schema(self.table.metadata())?;
+
+        // Check that all column names exist in the schema.
+        if !self.column_names.is_empty() {
+            for column_name in &self.column_names {
+                if schema.field_by_name(column_name).is_none() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Column {} not found in table.", column_name),
+                    ));
+                }
+            }
+        }
+
+        Ok(TableScan {
+            snapshot,
+            file_io: self.table.file_io().clone(),
+            table_metadata: self.table.metadata_ref(),
+            column_names: self.column_names,
+            schema,
+        })
+    }
+}
+
+/// Table scan.
+#[derive(Debug)]
+#[allow(dead_code)]
+pub struct TableScan {
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    column_names: Vec<String>,
+    schema: SchemaRef,
+}
+
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
+
+impl TableScan {
+    /// Returns a stream of file scan tasks.
+    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
+        let manifest_list = self
+            .snapshot
+            .load_manifest_list(&self.file_io, &self.table_metadata)
+            .await?;
+
+        // Get minimum sequence number of data files.
+        let min_data_file_seq_num = manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+            .map(|e| e.min_sequence_number)
+            .min()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Collect deletion files first.
+        let mut position_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+        let mut eq_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+
+        // TODO: We should introduce runtime api to enable parallel scan.
+        for manifest_list_entry in manifest_list.entries().iter().filter(|e| {
+            e.content == ManifestContentType::Deletes && e.sequence_number >= 
min_data_file_seq_num
+        }) {
+            let manifest_file = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest_file.entries().iter().filter(|e| 
e.is_alive()) {
+                match manifest_entry.content_type() {
+                    DataContentType::PositionDeletes => {
+                        position_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::EqualityDeletes => {
+                        eq_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::Data => {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Data file entry({}) found in delete manifest 
file({})",
+                                manifest_entry.file_path(),
+                                manifest_list_entry.manifest_path
+                            ),
+                        ));
+                    }
+                }
+            }
+        }
+
+        // Sort delete files by sequence number.
+        position_delete_files
+            .sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+        eq_delete_files.sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+
+        // Generate data file stream
+        let mut file_scan_tasks = 
Vec::with_capacity(manifest_list.entries().len());
+        for manifest_list_entry in manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+        {
+            // Data file
+            let manifest = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest.entries() {
+                if manifest_entry.is_alive() {
+                    file_scan_tasks.push(Ok(FileScanTask {
+                        data_file: manifest_entry.clone(),
+                        position_delete_files: 
TableScan::filter_position_delete_files(
+                            manifest_entry,
+                            &position_delete_files,
+                        ),
+                        eq_delete_files: TableScan::filter_eq_delete_files(
+                            manifest_entry,
+                            &eq_delete_files,
+                        ),
+                        start: 0,
+                        length: manifest_entry.file_size_in_bytes(),
+                    }));
+                }
+            }
+        }
+
+        Ok(iter(file_scan_tasks).boxed())
+    }
+
+    /// Return the position delete files that should be applied to the data 
file.
+    ///
+    /// Here we assume that the position delete files are sorted by sequence 
number in ascending order.
+    fn filter_position_delete_files(
+        data_file: &ManifestEntry,
+        position_deletes: &[ManifestEntryRef],
+    ) -> Vec<ManifestEntryRef> {
+        let data_seq_num = data_file
+            .sequence_number()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Find the first position delete file whose sequence number is 
greater than or equal to the data file.
+        let first_entry = position_deletes.partition_point(|e| {
+            e.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER) < 
data_seq_num

Review Comment:
   I think we want to include eq as well, in case of the initial sequence 
number.
   ```suggestion
               e.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER) =< 
data_seq_num
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to