vustef commented on code in PR #1824:
URL: https://github.com/apache/iceberg-rust/pull/1824#discussion_r2490064575


##########
crates/iceberg/src/scan/mod.rs:
##########
@@ -280,6 +315,7 @@ impl<'a> TableScanBuilder<'a> {
             partition_filter_cache: Arc::new(PartitionFilterCache::new()),
             manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
             expression_evaluator_cache: 
Arc::new(ExpressionEvaluatorCache::new()),
+            file_column_position,

Review Comment:
   Having a separate field for file column position is not ideal, because it 
doesn't scale, i.e. what will we do when we need to add more virtual/metadata 
columns and their positions.
   So perhaps preserving them through field IDs vector would inherently 
preserve positions?



##########
crates/iceberg/src/scan/mod.rs:
##########
@@ -239,7 +265,16 @@ impl<'a> TableScanBuilder<'a> {
                 .collect()
         });
 
-        for column_name in column_names.iter() {
+        // Track the position of the _file column if requested
+        let mut file_column_position = None;
+
+        for (index, column_name) in column_names.iter().enumerate() {
+            // Handle special reserved column "_file"
+            if column_name == RESERVED_COL_NAME_FILE_INTERNAL {
+                file_column_position = Some(index);
+                continue; // Don't add to field_ids - it's a virtual column

Review Comment:
   Maybe they should end up in `field_ids`. They end up being in 
`task.project_field_ids` that is passed to `get_arrow_projection_mask`, and if 
we're true to keep all schemas updated to contain these field IDs (after all, 
they are part of Iceberg spec), then perhaps it'd all work out.



##########
crates/iceberg/src/scan/mod.rs:
##########
@@ -42,6 +42,28 @@ use crate::table::Table;
 use crate::utils::available_parallelism;
 use crate::{Error, ErrorKind, Result};
 
+/// Reserved column name for the file path metadata column.
+///
+/// When this column is selected in a table scan (e.g., `.select(["col1", 
RESERVED_COL_NAME_FILE])`),
+/// each row will include the path of the file from which that row was read.
+/// This is useful for debugging, auditing, or tracking data lineage.
+///
+/// # Example
+/// ```no_run
+/// # use iceberg::scan::RESERVED_COL_NAME_FILE;
+/// # use iceberg::table::Table;
+/// # async fn example() -> iceberg::Result<()> {
+/// # let table: Table = todo!();
+/// // Select regular columns along with the file path
+/// let scan = table
+///     .scan()
+///     .select(["id", "name", RESERVED_COL_NAME_FILE])

Review Comment:
   How do we ask for `_file` column without having to explicitly list all the 
other columns? E.g. get me all columns + `_file`. There should be some shortcut 
for this.



##########
crates/iceberg/src/scan/mod.rs:
##########
@@ -1790,8 +1829,233 @@ pub mod tests {
             schema,
             record_count: None,
             data_file_format: DataFileFormat::Avro,
+            file_column_position: None,
             deletes: vec![],
         };
         test_fn(task);
     }
+
+    #[tokio::test]
+    async fn test_select_with_file_column() {
+        use arrow_array::cast::AsArray;
+
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select regular columns plus the _file column
+        let table_scan = fixture
+            .table
+            .scan()
+            .select(["x", RESERVED_COL_NAME_FILE])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        // Verify we have 2 columns: x and _file
+        assert_eq!(batches[0].num_columns(), 2);
+
+        // Verify the x column exists and has correct data
+        let x_col = batches[0].column_by_name("x").unwrap();
+        let x_arr = x_col.as_primitive::<arrow_array::types::Int64Type>();
+        assert_eq!(x_arr.value(0), 1);
+
+        // Verify the _file column exists
+        let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE);
+        assert!(
+            file_col.is_some(),
+            "_file column should be present in the batch"
+        );
+
+        // Verify the _file column contains a file path
+        let file_col = file_col.unwrap();
+        assert!(
+            matches!(
+                file_col.data_type(),
+                arrow_schema::DataType::RunEndEncoded(_, _)
+            ),
+            "_file column should use RunEndEncoded type"
+        );
+
+        // Decode the RunArray to verify it contains the file path
+        let run_array = file_col
+            .as_any()
+            
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+            .expect("_file column should be a RunArray");
+
+        let values = run_array.values();
+        let string_values = values.as_string::<i32>();
+        assert_eq!(string_values.len(), 1, "Should have a single file path");
+
+        let file_path = string_values.value(0);
+        assert!(
+            file_path.ends_with(".parquet"),
+            "File path should end with .parquet, got: {}",
+            file_path
+        );
+    }
+
+    #[tokio::test]
+    async fn test_select_file_column_position() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select columns in specific order: x, _file, z
+        let table_scan = fixture
+            .table
+            .scan()
+            .select(["x", RESERVED_COL_NAME_FILE, "z"])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_columns(), 3);
+
+        // Verify column order: x at position 0, _file at position 1, z at 
position 2
+        let schema = batches[0].schema();
+        assert_eq!(schema.field(0).name(), "x");
+        assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE);
+        assert_eq!(schema.field(2).name(), "z");
+
+        // Verify columns by name also works
+        assert!(batches[0].column_by_name("x").is_some());
+        assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some());
+        assert!(batches[0].column_by_name("z").is_some());
+    }
+
+    #[tokio::test]
+    async fn test_select_file_column_only() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select only the _file column
+        let table_scan = fixture
+            .table
+            .scan()
+            .select([RESERVED_COL_NAME_FILE])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        // Should have exactly 1 column
+        assert_eq!(batches[0].num_columns(), 1);
+
+        // Verify it's the _file column
+        let schema = batches[0].schema();
+        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
+
+        // Verify the batch has the correct number of rows
+        // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted)
+        // Each file has 1024 rows, so total is 2048 rows
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 2048);
+    }
+
+    #[tokio::test]
+    async fn test_file_column_with_multiple_files() {
+        use std::collections::HashSet;
+
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select x and _file columns
+        let table_scan = fixture
+            .table
+            .scan()
+            .select(["x", RESERVED_COL_NAME_FILE])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        // Collect all unique file paths from the batches
+        let mut file_paths = HashSet::new();
+        for batch in &batches {
+            let file_col = 
batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap();
+            let run_array = file_col
+                .as_any()
+                
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
+                .expect("_file column should be a RunArray");
+
+            let values = run_array.values();
+            let string_values = values.as_string::<i32>();
+            for i in 0..string_values.len() {
+                file_paths.insert(string_values.value(i).to_string());
+            }
+        }
+
+        // We should have multiple files (the test creates 1.parquet and 
3.parquet)
+        assert!(!file_paths.is_empty(), "Should have at least one file path");
+
+        // All paths should end with .parquet
+        for path in &file_paths {
+            assert!(
+                path.ends_with(".parquet"),
+                "All file paths should end with .parquet, got: {}",
+                path
+            );
+        }
+    }
+
+    #[tokio::test]
+    async fn test_file_column_at_start() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select _file at the start
+        let table_scan = fixture
+            .table
+            .scan()
+            .select([RESERVED_COL_NAME_FILE, "x", "y"])
+            .with_row_selection_enabled(true)
+            .build()
+            .unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_columns(), 3);
+
+        // Verify _file is at position 0
+        let schema = batches[0].schema();
+        assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE);
+        assert_eq!(schema.field(1).name(), "x");
+        assert_eq!(schema.field(2).name(), "y");
+    }
+
+    #[tokio::test]
+    async fn test_file_column_at_end() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Select _file at the end
+        let table_scan = fixture
+            .table
+            .scan()
+            .select(["x", "y", RESERVED_COL_NAME_FILE])

Review Comment:
   what happens if you do `.select(["x", "y", RESERVED_COL_NAME_FILE, 
.select(["x", "y", RESERVED_COL_NAME_FILE])])`, so `RESERVED_COL_NAME_FILE` is 
listed twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to