amogh-jahagirdar commented on code in PR #129:
URL: https://github.com/apache/iceberg-rust/pull/129#discussion_r1435638256


##########
crates/iceberg/src/spec/schema.rs:
##########
@@ -32,16 +32,18 @@ use std::sync::Arc;
 
 use _serde::SchemaEnum;
 
+/// Type alias for schema id.
+pub type SchemaId = i32;

Review Comment:
   Nit: Is the type aliasing really needed here? Not sure what's considered 
common practice in Rust but feels to me we could just leave it as i32



##########
crates/iceberg/src/table.rs:
##########
@@ -42,8 +50,624 @@ impl Table {
         &self.metadata
     }
 
+    /// Returns current metadata ref.
+    pub fn metadata_ref(&self) -> TableMetadataRef {
+        self.metadata.clone()
+    }
+
     /// Returns current metadata location.
     pub fn metadata_location(&self) -> Option<&str> {
         self.metadata_location.as_deref()
     }
+
+    /// Creates a table scan.
+    pub fn scan(&self) -> TableScanBuilder<'_> {
+        TableScanBuilder::new(self)
+    }
+}
+
+/// Builder to create table scan.
+pub struct TableScanBuilder<'a> {
+    table: &'a Table,
+    // Empty column names means to select all columns
+    column_names: Vec<String>,
+    limit: Option<usize>,
+    case_sensitive: bool,
+    snapshot_id: Option<i64>,
+}
+
+impl<'a> TableScanBuilder<'a> {
+    fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            column_names: vec![],
+            case_sensitive: false,
+            snapshot_id: None,
+            limit: None,
+        }
+    }
+
+    /// Select all columns.
+    pub fn select_all(mut self) -> Self {
+        self.column_names.clear();
+        self
+    }
+
+    /// Select some columns of the table.
+    pub fn select(mut self, column_names: impl IntoIterator<Item = impl 
ToString>) -> Self {
+        self.column_names = column_names
+            .into_iter()
+            .map(|item| item.to_string())
+            .collect();
+        self
+    }
+
+    /// Limit the number of rows returned.
+    ///
+    /// If not set, all rows will be returned.
+    /// If set, the value must be greater than 0.
+    pub fn limit(mut self, limit: usize) -> Self {
+        self.limit = Some(limit);
+        self
+    }
+
+    /// Set the snapshot to scan. When not set, it uses current snapshot.
+    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = Some(snapshot_id);
+        self
+    }
+
+    /// Build the table scan.
+    pub fn build(self) -> Result<TableScan> {
+        let snapshot = match self.snapshot_id {
+            Some(snapshot_id) => self
+                .table
+                .metadata()
+                .snapshot_by_id(snapshot_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Snapshot with id {} not found", snapshot_id),
+                    )
+                })?
+                .clone(),
+            None => self
+                .table
+                .metadata()
+                .current_snapshot()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        "Can't scan table without snapshots",
+                    )
+                })?
+                .clone(),
+        };
+
+        let schema = snapshot.schema(self.table.metadata())?;
+
+        // Check that all column names exist in the schema.
+        if !self.column_names.is_empty() {
+            for column_name in &self.column_names {
+                if schema.field_by_name(column_name).is_none() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Column {} not found in table.", column_name),
+                    ));
+                }
+            }
+        }
+
+        Ok(TableScan {
+            column_names: self.column_names.clone(),
+            limit: None,
+            snapshot,
+            schema,
+            file_io: self.table.file_io.clone(),
+            table_metadata: self.table.metadata_ref(),
+        })
+    }
+}
+
+/// Table scan.
+#[derive(Debug)]
+pub struct TableScan {
+    column_names: Vec<String>,
+    limit: Option<usize>,
+    snapshot: SnapshotRef,
+    schema: SchemaRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+}
+
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
+impl TableScan {
+    /// Returns a stream of file scan tasks.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let manifest_list = self
+            .snapshot
+            .load_manifest_list(&self.file_io, &self.table_metadata)
+            .await?;
+
+        // Get minimum sequence number of data files.
+        let min_data_file_seq_num = manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+            .map(|e| e.min_sequence_number)
+            .min()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Collect deletion files first.
+        let mut position_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+        let mut eq_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+
+        // TODO: We should introduce runtime api to enable parallel scan.
+        for manifest_list_entry in manifest_list.entries().iter().filter(|e| {
+            e.content == ManifestContentType::Deletes && e.sequence_number >= 
min_data_file_seq_num
+        }) {
+            let manifest_file = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest_file.entries().iter().filter(|e| 
e.is_alive()) {
+                match manifest_entry.content_type() {
+                    DataContentType::PositionDeletes => {
+                        position_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::EqualityDeletes => {
+                        eq_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::Data => {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Data file entry({}) found in delete manifest 
file({})",
+                                manifest_entry.file_path(),
+                                manifest_list_entry.manifest_path
+                            ),
+                        ));
+                    }
+                }
+            }
+        }
+
+        // Sort delete files by sequence number.
+        position_delete_files
+            .sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+        eq_delete_files.sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+
+        // Generate data file stream
+        let mut file_scan_tasks = 
Vec::with_capacity(manifest_list.entries().len());
+        for manifest_list_entry in manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+        {
+            // Data file
+            let manifest = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest.entries() {
+                if manifest_entry.is_alive() {
+                    file_scan_tasks.push(Ok(FileScanTask {
+                        data_file: manifest_entry.clone(),
+                        position_delete_files: 
TableScan::filter_position_delete_files(
+                            manifest_entry,
+                            &position_delete_files,
+                        ),
+                        eq_delete_files: TableScan::filter_eq_delete_files(
+                            manifest_entry,
+                            &eq_delete_files,
+                        ),
+                        start: 0,
+                        length: manifest_entry.file_size_in_bytes(),
+                    }));
+                }
+            }
+        }
+
+        Ok(iter(file_scan_tasks).boxed())
+    }
+
+    /// Return the position delete files that should be applied to the data 
file.
+    ///
+    /// Here we assume that the position delete files are sorted by sequence 
number in ascending order.
+    fn filter_position_delete_files(
+        data_file: &ManifestEntry,
+        position_deletes: &[ManifestEntryRef],
+    ) -> Vec<ManifestEntryRef> {
+        let data_seq_num = data_file
+            .sequence_number()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Find the first position delete file whose sequence number is 
greater than or equal to the data file.
+        let first_entry = position_deletes.partition_point(|e| {
+            e.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER) < 
data_seq_num
+        });
+
+        // TODO: We should further filter the position delete files by 
`file_path` column.
+        position_deletes.iter().skip(first_entry).cloned().collect()
+    }
+
+    /// Return the equality delete files that should be applied to the data 
file.
+    ///
+    /// Here we assume that the equality delete files are sorted by sequence 
number in ascending order.
+    fn filter_eq_delete_files(
+        data_file: &ManifestEntry,
+        eq_deletes: &[ManifestEntryRef],
+    ) -> Vec<ManifestEntryRef> {
+        let data_seq_num = data_file
+            .sequence_number()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Find the first position delete file whose sequence number is 
greater than or equal to the data file.
+        let first_entry = eq_deletes.partition_point(|e| {
+            e.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER) <= 
data_seq_num
+        });
+
+        // TODO: We should further filter the eq delete files statistics
+        eq_deletes.iter().skip(first_entry).cloned().collect()
+    }
+}
+
+/// A task to scan part of file.
+#[derive(Debug)]
+pub struct FileScanTask {
+    data_file: ManifestEntryRef,
+    position_delete_files: Vec<ManifestEntryRef>,
+    eq_delete_files: Vec<ManifestEntryRef>,
+    start: u64,
+    length: u64,
+}
+
+/// A stream of arrow record batches.
+pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
+impl FileScanTask {
+    /// Returns a stream of arrow record batches.
+    pub async fn execute(&self) -> Result<ArrowRecordBatchStream> {
+        todo!()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::io::{FileIO, OutputFile};
+    use crate::spec::{
+        DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, 
Manifest,
+        ManifestContentType, ManifestEntry, ManifestListWriter, 
ManifestMetadata, ManifestStatus,
+        ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
+    };
+    use crate::table::Table;
+    use crate::TableIdent;
+    use futures::TryStreamExt;
+    use std::fs;
+    use tempfile::TempDir;
+    use tera::{Context, Tera};
+    use uuid::Uuid;
+
+    struct TableTestFixture {
+        tmp_dir: TempDir,
+        table_location: String,
+        manifest_list1_location: String,
+        manifest_list2_location: String,
+        table_metadata1_location: String,
+        table: Table,
+    }
+
+    impl TableTestFixture {
+        fn new() -> Self {
+            let tmp_dir = TempDir::new().unwrap();
+            let table_location = tmp_dir.path().join("table1");
+            let manifest_list1_location = 
table_location.join("metadata/manifests_list_1.avro");
+            let manifest_list2_location = 
table_location.join("metadata/manifests_list_2.avro");
+            let table_metadata1_location = 
table_location.join("metadata/v1.json");
+
+            let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+                .unwrap()
+                .build()
+                .unwrap();
+
+            let table_metadata = {
+                let template_json_str = fs::read_to_string(format!(
+                    "{}/testdata/example_table_metadata_v2.json",
+                    env!("CARGO_MANIFEST_DIR")
+                ))
+                .unwrap();
+                let mut context = Context::new();
+                context.insert("table_location", &table_location);
+                context.insert("manifest_list_1_location", 
&manifest_list1_location);
+                context.insert("manifest_list_2_location", 
&manifest_list2_location);
+                context.insert("table_metadata_1_location", 
&table_metadata1_location);
+
+                let metadata_json = Tera::one_off(&template_json_str, 
&context, false).unwrap();
+                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
+            };
+
+            let table = Table::builder()
+                .metadata(table_metadata)
+                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
+                .file_io(file_io)
+                
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+                .build();
+
+            Self {
+                tmp_dir,
+                manifest_list1_location: 
manifest_list1_location.to_str().unwrap().to_string(),
+                manifest_list2_location: 
manifest_list2_location.to_str().unwrap().to_string(),
+                table_metadata1_location: 
table_metadata1_location.to_str().unwrap().to_string(),
+                table_location: table_location.to_str().unwrap().to_string(),
+                table,
+            }
+        }
+
+        fn next_manifest_file(&self) -> OutputFile {
+            self.table
+                .file_io
+                .new_output(format!(
+                    "{}/metadata/manifest_{}.avro",
+                    self.table_location,
+                    Uuid::new_v4()
+                ))
+                .unwrap()
+        }
+    }
+
+    #[test]
+    fn test_table_scan_columns() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
+        assert_eq!(vec!["x", "y"], table_scan.column_names);
+
+        let table_scan = table
+            .scan()
+            .select(["x", "y"])
+            .select(["z"])
+            .build()
+            .unwrap();
+        assert_eq!(vec!["z"], table_scan.column_names);
+    }
+
+    #[test]
+    fn test_select_all() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().select_all().build().unwrap();
+        assert!(table_scan.column_names.is_empty());
+    }
+
+    #[test]
+    fn test_select_no_exist_column() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().select(["x", "y", "z", "a"]).build();
+        assert!(table_scan.is_err());
+    }
+
+    #[test]
+    fn test_table_scan_default_snapshot_id() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().build().unwrap();
+        assert_eq!(
+            table.metadata().current_snapshot().unwrap().snapshot_id(),
+            table_scan.snapshot.snapshot_id()
+        );
+    }
+
+    #[test]
+    fn test_table_scan_non_exist_snapshot_id() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().snapshot_id(1024).build();
+        assert!(table_scan.is_err());
+    }
+
+    #[test]
+    fn test_table_scan_with_snapshot_id() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table
+            .scan()
+            .snapshot_id(3051729675574597004)
+            .build()
+            .unwrap();
+        assert_eq!(table_scan.snapshot.snapshot_id(), 3051729675574597004);
+    }
+
+    #[tokio::test]
+    async fn test_plan_files() {
+        let fixture = TableTestFixture::new();
+
+        let current_snapshot = 
fixture.table.metadata().current_snapshot().unwrap();
+        let parent_snapshot = current_snapshot
+            .parent_snapshot(fixture.table.metadata())
+            .unwrap();
+        let current_schema = 
current_snapshot.schema(fixture.table.metadata()).unwrap();
+        let current_partition_spec = 
fixture.table.metadata().default_partition_spec().unwrap();
+
+        // Write data files
+        let data_file_manifest = ManifestWriter::new(
+            fixture.next_manifest_file(),
+            current_snapshot.snapshot_id(),
+            vec![],
+        )
+        .write(Manifest::new(
+            ManifestMetadata::builder()
+                .schema((*current_schema).clone())
+                .content(ManifestContentType::Data)
+                .format_version(FormatVersion::V2)
+                .partition_spec((**current_partition_spec).clone())
+                .schema_id(current_schema.schema_id())
+                .build(),
+            vec![
+                ManifestEntry::builder()
+                    .status(ManifestStatus::Added)
+                    .data_file(
+                        DataFile::builder()
+                            .content(DataContentType::Data)
+                            .file_path(format!("{}/1.parquet", 
&fixture.table_location))
+                            .file_format(DataFileFormat::Parquet)
+                            .file_size_in_bytes(100)
+                            .record_count(1)
+                            
.partition(Struct::from_iter([Some(Literal::long(100))]))
+                            .build(),
+                    )
+                    .build(),
+                ManifestEntry::builder()
+                    .status(ManifestStatus::Deleted)
+                    .snapshot_id(parent_snapshot.snapshot_id())
+                    .sequence_number(parent_snapshot.sequence_number())
+                    .file_sequence_number(parent_snapshot.sequence_number())

Review Comment:
    It looks like we don't differentiate between the data sequence number and 
the file sequence number? These two value can be different, for example for 
compacted snapshots where the file sequence number is larger than the data 
sequence number. 
   
   I don't think it really matters for scan planning since we always use the 
data sequence number for pruning the delete files but it may lead to confusing 
metadata at read time. I'm happy to take a stab at fixing this if there's 
interest?



##########
crates/iceberg/src/spec/manifest.rs:
##########
@@ -781,10 +805,13 @@ impl ManifestMetadata {
     }
 }
 
+/// Reference to [`ManifestEntry`].
+pub type ManifestEntryRef = Arc<ManifestEntry>;

Review Comment:
   Sorry still new to Rust but is suffixing with `Ref` a common naming 
convention for reference counted variables?



##########
crates/iceberg/src/table.rs:
##########
@@ -42,8 +50,624 @@ impl Table {
         &self.metadata
     }
 
+    /// Returns current metadata ref.
+    pub fn metadata_ref(&self) -> TableMetadataRef {
+        self.metadata.clone()
+    }
+
     /// Returns current metadata location.
     pub fn metadata_location(&self) -> Option<&str> {
         self.metadata_location.as_deref()
     }
+
+    /// Creates a table scan.
+    pub fn scan(&self) -> TableScanBuilder<'_> {
+        TableScanBuilder::new(self)
+    }
+}
+
+/// Builder to create table scan.
+pub struct TableScanBuilder<'a> {
+    table: &'a Table,
+    // Empty column names means to select all columns
+    column_names: Vec<String>,
+    limit: Option<usize>,
+    case_sensitive: bool,
+    snapshot_id: Option<i64>,
+}
+
+impl<'a> TableScanBuilder<'a> {
+    fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            column_names: vec![],
+            case_sensitive: false,
+            snapshot_id: None,
+            limit: None,
+        }
+    }
+
+    /// Select all columns.
+    pub fn select_all(mut self) -> Self {
+        self.column_names.clear();
+        self
+    }
+
+    /// Select some columns of the table.
+    pub fn select(mut self, column_names: impl IntoIterator<Item = impl 
ToString>) -> Self {
+        self.column_names = column_names
+            .into_iter()
+            .map(|item| item.to_string())
+            .collect();
+        self
+    }
+
+    /// Limit the number of rows returned.
+    ///
+    /// If not set, all rows will be returned.
+    /// If set, the value must be greater than 0.
+    pub fn limit(mut self, limit: usize) -> Self {
+        self.limit = Some(limit);
+        self
+    }
+
+    /// Set the snapshot to scan. When not set, it uses current snapshot.
+    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = Some(snapshot_id);
+        self
+    }
+
+    /// Build the table scan.
+    pub fn build(self) -> Result<TableScan> {
+        let snapshot = match self.snapshot_id {
+            Some(snapshot_id) => self
+                .table
+                .metadata()
+                .snapshot_by_id(snapshot_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Snapshot with id {} not found", snapshot_id),
+                    )
+                })?
+                .clone(),
+            None => self
+                .table
+                .metadata()
+                .current_snapshot()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        "Can't scan table without snapshots",
+                    )
+                })?
+                .clone(),
+        };
+
+        let schema = snapshot.schema(self.table.metadata())?;
+
+        // Check that all column names exist in the schema.
+        if !self.column_names.is_empty() {
+            for column_name in &self.column_names {
+                if schema.field_by_name(column_name).is_none() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Column {} not found in table.", column_name),
+                    ));
+                }
+            }
+        }
+
+        Ok(TableScan {
+            column_names: self.column_names.clone(),
+            limit: None,
+            snapshot,
+            schema,
+            file_io: self.table.file_io.clone(),
+            table_metadata: self.table.metadata_ref(),
+        })
+    }
+}
+
+/// Table scan.
+#[derive(Debug)]
+pub struct TableScan {
+    column_names: Vec<String>,
+    limit: Option<usize>,
+    snapshot: SnapshotRef,
+    schema: SchemaRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+}
+
+/// A stream of [`FileScanTask`].
+pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
+impl TableScan {
+    /// Returns a stream of file scan tasks.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let manifest_list = self
+            .snapshot
+            .load_manifest_list(&self.file_io, &self.table_metadata)
+            .await?;
+
+        // Get minimum sequence number of data files.
+        let min_data_file_seq_num = manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+            .map(|e| e.min_sequence_number)
+            .min()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Collect deletion files first.
+        let mut position_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+        let mut eq_delete_files = 
Vec::with_capacity(manifest_list.entries().len());
+
+        // TODO: We should introduce runtime api to enable parallel scan.
+        for manifest_list_entry in manifest_list.entries().iter().filter(|e| {
+            e.content == ManifestContentType::Deletes && e.sequence_number >= 
min_data_file_seq_num
+        }) {
+            let manifest_file = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest_file.entries().iter().filter(|e| 
e.is_alive()) {
+                match manifest_entry.content_type() {
+                    DataContentType::PositionDeletes => {
+                        position_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::EqualityDeletes => {
+                        eq_delete_files.push(manifest_entry.clone());
+                    }
+                    DataContentType::Data => {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Data file entry({}) found in delete manifest 
file({})",
+                                manifest_entry.file_path(),
+                                manifest_list_entry.manifest_path
+                            ),
+                        ));
+                    }
+                }
+            }
+        }
+
+        // Sort delete files by sequence number.
+        position_delete_files
+            .sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+        eq_delete_files.sort_by_key(|f| 
f.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER));
+
+        // Generate data file stream
+        let mut file_scan_tasks = 
Vec::with_capacity(manifest_list.entries().len());
+        for manifest_list_entry in manifest_list
+            .entries()
+            .iter()
+            .filter(|e| e.content == ManifestContentType::Data)
+        {
+            // Data file
+            let manifest = 
manifest_list_entry.load_manifest(&self.file_io).await?;
+
+            for manifest_entry in manifest.entries() {
+                if manifest_entry.is_alive() {
+                    file_scan_tasks.push(Ok(FileScanTask {
+                        data_file: manifest_entry.clone(),
+                        position_delete_files: 
TableScan::filter_position_delete_files(
+                            manifest_entry,
+                            &position_delete_files,
+                        ),
+                        eq_delete_files: TableScan::filter_eq_delete_files(
+                            manifest_entry,
+                            &eq_delete_files,
+                        ),
+                        start: 0,
+                        length: manifest_entry.file_size_in_bytes(),
+                    }));
+                }
+            }
+        }
+
+        Ok(iter(file_scan_tasks).boxed())
+    }
+
+    /// Return the position delete files that should be applied to the data 
file.
+    ///
+    /// Here we assume that the position delete files are sorted by sequence 
number in ascending order.
+    fn filter_position_delete_files(
+        data_file: &ManifestEntry,
+        position_deletes: &[ManifestEntryRef],
+    ) -> Vec<ManifestEntryRef> {
+        let data_seq_num = data_file
+            .sequence_number()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Find the first position delete file whose sequence number is 
greater than or equal to the data file.
+        let first_entry = position_deletes.partition_point(|e| {
+            e.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER) < 
data_seq_num
+        });
+
+        // TODO: We should further filter the position delete files by 
`file_path` column.
+        position_deletes.iter().skip(first_entry).cloned().collect()
+    }
+
+    /// Return the equality delete files that should be applied to the data 
file.
+    ///
+    /// Here we assume that the equality delete files are sorted by sequence 
number in ascending order.
+    fn filter_eq_delete_files(
+        data_file: &ManifestEntry,
+        eq_deletes: &[ManifestEntryRef],
+    ) -> Vec<ManifestEntryRef> {
+        let data_seq_num = data_file
+            .sequence_number()
+            .unwrap_or(INITIAL_SEQUENCE_NUMBER);
+
+        // Find the first position delete file whose sequence number is 
greater than or equal to the data file.
+        let first_entry = eq_deletes.partition_point(|e| {
+            e.sequence_number().unwrap_or(INITIAL_SEQUENCE_NUMBER) <= 
data_seq_num
+        });
+
+        // TODO: We should further filter the eq delete files statistics
+        eq_deletes.iter().skip(first_entry).cloned().collect()
+    }
+}
+
+/// A task to scan part of file.
+#[derive(Debug)]
+pub struct FileScanTask {
+    data_file: ManifestEntryRef,
+    position_delete_files: Vec<ManifestEntryRef>,
+    eq_delete_files: Vec<ManifestEntryRef>,
+    start: u64,
+    length: u64,
+}
+
+/// A stream of arrow record batches.
+pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
+impl FileScanTask {
+    /// Returns a stream of arrow record batches.
+    pub async fn execute(&self) -> Result<ArrowRecordBatchStream> {
+        todo!()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::io::{FileIO, OutputFile};
+    use crate::spec::{
+        DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, 
Manifest,
+        ManifestContentType, ManifestEntry, ManifestListWriter, 
ManifestMetadata, ManifestStatus,
+        ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
+    };
+    use crate::table::Table;
+    use crate::TableIdent;
+    use futures::TryStreamExt;
+    use std::fs;
+    use tempfile::TempDir;
+    use tera::{Context, Tera};
+    use uuid::Uuid;
+
+    struct TableTestFixture {
+        tmp_dir: TempDir,
+        table_location: String,
+        manifest_list1_location: String,
+        manifest_list2_location: String,
+        table_metadata1_location: String,
+        table: Table,
+    }
+
+    impl TableTestFixture {
+        fn new() -> Self {
+            let tmp_dir = TempDir::new().unwrap();
+            let table_location = tmp_dir.path().join("table1");
+            let manifest_list1_location = 
table_location.join("metadata/manifests_list_1.avro");
+            let manifest_list2_location = 
table_location.join("metadata/manifests_list_2.avro");
+            let table_metadata1_location = 
table_location.join("metadata/v1.json");
+
+            let file_io = 
FileIO::from_path(table_location.as_os_str().to_str().unwrap())
+                .unwrap()
+                .build()
+                .unwrap();
+
+            let table_metadata = {
+                let template_json_str = fs::read_to_string(format!(
+                    "{}/testdata/example_table_metadata_v2.json",
+                    env!("CARGO_MANIFEST_DIR")
+                ))
+                .unwrap();
+                let mut context = Context::new();
+                context.insert("table_location", &table_location);
+                context.insert("manifest_list_1_location", 
&manifest_list1_location);
+                context.insert("manifest_list_2_location", 
&manifest_list2_location);
+                context.insert("table_metadata_1_location", 
&table_metadata1_location);
+
+                let metadata_json = Tera::one_off(&template_json_str, 
&context, false).unwrap();
+                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
+            };
+
+            let table = Table::builder()
+                .metadata(table_metadata)
+                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
+                .file_io(file_io)
+                
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
+                .build();
+
+            Self {
+                tmp_dir,
+                manifest_list1_location: 
manifest_list1_location.to_str().unwrap().to_string(),
+                manifest_list2_location: 
manifest_list2_location.to_str().unwrap().to_string(),
+                table_metadata1_location: 
table_metadata1_location.to_str().unwrap().to_string(),
+                table_location: table_location.to_str().unwrap().to_string(),
+                table,
+            }
+        }
+
+        fn next_manifest_file(&self) -> OutputFile {
+            self.table
+                .file_io
+                .new_output(format!(
+                    "{}/metadata/manifest_{}.avro",
+                    self.table_location,
+                    Uuid::new_v4()
+                ))
+                .unwrap()
+        }
+    }
+
+    #[test]
+    fn test_table_scan_columns() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().select(["x", "y"]).build().unwrap();
+        assert_eq!(vec!["x", "y"], table_scan.column_names);
+
+        let table_scan = table
+            .scan()
+            .select(["x", "y"])
+            .select(["z"])
+            .build()
+            .unwrap();
+        assert_eq!(vec!["z"], table_scan.column_names);
+    }
+
+    #[test]
+    fn test_select_all() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().select_all().build().unwrap();
+        assert!(table_scan.column_names.is_empty());
+    }
+
+    #[test]
+    fn test_select_no_exist_column() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().select(["x", "y", "z", "a"]).build();
+        assert!(table_scan.is_err());
+    }
+
+    #[test]
+    fn test_table_scan_default_snapshot_id() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().build().unwrap();
+        assert_eq!(
+            table.metadata().current_snapshot().unwrap().snapshot_id(),
+            table_scan.snapshot.snapshot_id()
+        );
+    }
+
+    #[test]
+    fn test_table_scan_non_exist_snapshot_id() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table.scan().snapshot_id(1024).build();
+        assert!(table_scan.is_err());
+    }
+
+    #[test]
+    fn test_table_scan_with_snapshot_id() {
+        let table = TableTestFixture::new().table;
+
+        let table_scan = table
+            .scan()
+            .snapshot_id(3051729675574597004)
+            .build()
+            .unwrap();
+        assert_eq!(table_scan.snapshot.snapshot_id(), 3051729675574597004);
+    }
+
+    #[tokio::test]
+    async fn test_plan_files() {
+        let fixture = TableTestFixture::new();
+
+        let current_snapshot = 
fixture.table.metadata().current_snapshot().unwrap();
+        let parent_snapshot = current_snapshot
+            .parent_snapshot(fixture.table.metadata())
+            .unwrap();
+        let current_schema = 
current_snapshot.schema(fixture.table.metadata()).unwrap();
+        let current_partition_spec = 
fixture.table.metadata().default_partition_spec().unwrap();
+
+        // Write data files
+        let data_file_manifest = ManifestWriter::new(
+            fixture.next_manifest_file(),
+            current_snapshot.snapshot_id(),
+            vec![],
+        )
+        .write(Manifest::new(
+            ManifestMetadata::builder()
+                .schema((*current_schema).clone())
+                .content(ManifestContentType::Data)
+                .format_version(FormatVersion::V2)
+                .partition_spec((**current_partition_spec).clone())
+                .schema_id(current_schema.schema_id())
+                .build(),
+            vec![
+                ManifestEntry::builder()
+                    .status(ManifestStatus::Added)
+                    .data_file(
+                        DataFile::builder()
+                            .content(DataContentType::Data)
+                            .file_path(format!("{}/1.parquet", 
&fixture.table_location))
+                            .file_format(DataFileFormat::Parquet)
+                            .file_size_in_bytes(100)
+                            .record_count(1)
+                            
.partition(Struct::from_iter([Some(Literal::long(100))]))
+                            .build(),
+                    )
+                    .build(),
+                ManifestEntry::builder()
+                    .status(ManifestStatus::Deleted)
+                    .snapshot_id(parent_snapshot.snapshot_id())
+                    .sequence_number(parent_snapshot.sequence_number())
+                    .file_sequence_number(parent_snapshot.sequence_number())

Review Comment:
   Oh I see this is just about inheriting the sequence number from the 
snapshot, which would be the same. Ignore this then.



##########
crates/iceberg/src/table.rs:
##########
@@ -42,8 +50,624 @@ impl Table {
         &self.metadata
     }
 
+    /// Returns current metadata ref.
+    pub fn metadata_ref(&self) -> TableMetadataRef {
+        self.metadata.clone()
+    }
+
     /// Returns current metadata location.
     pub fn metadata_location(&self) -> Option<&str> {
         self.metadata_location.as_deref()
     }
+
+    /// Creates a table scan.
+    pub fn scan(&self) -> TableScanBuilder<'_> {
+        TableScanBuilder::new(self)
+    }
+}
+
+/// Builder to create table scan.
+pub struct TableScanBuilder<'a> {
+    table: &'a Table,
+    // Empty column names means to select all columns
+    column_names: Vec<String>,
+    limit: Option<usize>,
+    case_sensitive: bool,
+    snapshot_id: Option<i64>,
+}
+
+impl<'a> TableScanBuilder<'a> {
+    fn new(table: &'a Table) -> Self {
+        Self {
+            table,
+            column_names: vec![],
+            case_sensitive: false,
+            snapshot_id: None,
+            limit: None,
+        }
+    }
+
+    /// Select all columns.
+    pub fn select_all(mut self) -> Self {
+        self.column_names.clear();
+        self
+    }
+
+    /// Select some columns of the table.
+    pub fn select(mut self, column_names: impl IntoIterator<Item = impl 
ToString>) -> Self {
+        self.column_names = column_names
+            .into_iter()
+            .map(|item| item.to_string())
+            .collect();
+        self
+    }
+
+    /// Limit the number of rows returned.
+    ///
+    /// If not set, all rows will be returned.
+    /// If set, the value must be greater than 0.
+    pub fn limit(mut self, limit: usize) -> Self {
+        self.limit = Some(limit);
+        self
+    }
+
+    /// Set the snapshot to scan. When not set, it uses current snapshot.
+    pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
+        self.snapshot_id = Some(snapshot_id);
+        self
+    }
+
+    /// Build the table scan.
+    pub fn build(self) -> Result<TableScan> {
+        let snapshot = match self.snapshot_id {
+            Some(snapshot_id) => self
+                .table
+                .metadata()
+                .snapshot_by_id(snapshot_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Snapshot with id {} not found", snapshot_id),
+                    )
+                })?
+                .clone(),
+            None => self
+                .table
+                .metadata()
+                .current_snapshot()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        "Can't scan table without snapshots",
+                    )
+                })?
+                .clone(),
+        };
+
+        let schema = snapshot.schema(self.table.metadata())?;
+
+        // Check that all column names exist in the schema.
+        if !self.column_names.is_empty() {
+            for column_name in &self.column_names {
+                if schema.field_by_name(column_name).is_none() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Column {} not found in table.", column_name),
+                    ));
+                }
+            }
+        }
+
+        Ok(TableScan {
+            column_names: self.column_names.clone(),
+            limit: None,

Review Comment:
   This seems like it's not setting the limit that gets set and enforcing it? 
Also curious how are you planning on enforcing the limit?



##########
crates/iceberg/src/spec/snapshot.rs:
##########
@@ -124,6 +150,69 @@ impl Snapshot {
         Utc.timestamp_millis_opt(self.timestamp_ms).unwrap()
     }
 
+    /// Get the schema id of this snapshot.
+    #[inline]
+    pub fn schema_id(&self) -> Option<SchemaId> {
+        self.schema_id
+    }
+
+    /// Get the schema of this snapshot.
+    pub fn schema(&self, table_metadata: &TableMetadata) -> Result<SchemaRef> {
+        Ok(match self.schema_id() {
+            Some(schema_id) => table_metadata
+                .schema_by_id(schema_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Schema with id {} not found", schema_id),
+                    )
+                })?
+                .clone(),
+            None => table_metadata.current_schema().clone(),
+        })
+    }
+
+    /// Get parent snapshot.
+    pub fn parent_snapshot(&self, table_metadata: &TableMetadata) -> 
Option<SnapshotRef> {

Review Comment:
   Do we need to expose this API right now? It seems like it's only used in the 
test so what we could do is just use the table metadata to lookup the 
parent_snapshot_id which is already exposed.



##########
crates/iceberg/Cargo.toml:
##########
@@ -62,4 +62,5 @@ uuid = { workspace = true }
 [dev-dependencies]
 pretty_assertions = { workspace = true }
 tempfile = { workspace = true }
+tera = { workspace = true }

Review Comment:
   What is this used for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to