laskoviymishka commented on code in PR #2464:
URL: https://github.com/apache/iceberg-rust/pull/2464#discussion_r3395462759


##########
crates/iceberg/src/arrow/reader/projection.rs:
##########
@@ -1725,4 +1725,108 @@ message schema {
             .collect();
         assert_eq!(ids, vec![2, 3]);
     }
+
+    /// Test that row_selection_enabled gracefully handles Parquet files that 
lack
+    /// column index metadata (common with older/migrated files). The reader 
should
+    /// fall back to returning all rows instead of erroring.
+    ///
+    /// Regression test for 
<https://github.com/apache/iceberg-rust/issues/2452>.
+    #[tokio::test]
+    async fn 
test_read_parquet_without_column_index_with_row_selection_enabled() {
+        use arrow_array::Int32Array;
+
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = FileIO::new_with_fs();
+
+        let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as 
ArrayRef;
+
+        let to_write =
+            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
name_data]).unwrap();
+
+        // Write Parquet WITHOUT page index (column index / offset index) by 
disabling
+        // statistics at the page level. This simulates older Parquet files.
+        let props = WriterProperties::builder()
+            
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let file_path = format!("{table_location}/no_column_index.parquet");
+        let file = File::create(&file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        // Use a predicate that would match some rows
+        let predicate = Reference::new("id").less_than(Datum::int(3));
+
+        let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
+            .with_row_group_filtering_enabled(true)
+            .with_row_selection_enabled(true)
+            .build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(&file_path).unwrap().len(),
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: file_path,
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),

Review Comment:
   I think this test passes for the wrong reason, and I'd want to settle that 
before merge.
   
   `EnabledStatistics::None` stops page-level stats from being populated, but 
in parquet-rs >= 58 `parse_column_index` still maps every column to a 
`ColumnIndexMetaData::NONE` entry and calls `set_column_index(Some([NONE, 
...]))`. So `column_index()` here returns `Some(...)`, not `None` — the new 
`return Ok(None)` in `row_filter.rs` is never hit. `PageIndexEvaluator` already 
handled the `NONE` entries by selecting all rows before this PR, which means 
this test would have passed against the old code too. It's a vacuous guard as 
written.
   
   There's a deeper version of this: the defaults set 
`preload_column_index`/`preload_offset_index` to `true`, which maps to 
`PageIndexPolicy::Required`. Under `Required`, a file genuinely missing the 
offset index fails at metadata load in `open_parquet_file` — one frame before 
we ever reach `get_row_selection_for_filter_predicate`. So in the default 
config the new guard may be unreachable, and the real fix would be switching to 
`Optional` (load if present, don't error if absent). That said, #2452's 
reported error is the exact string from the old function body, which proves the 
reporter's env *did* load metadata and *did* reach the `column_index() is None` 
branch — so there's clearly a real path here, I just can't tell from the test 
which config exercises it.
   
   Two ways to nail this down, either works: assert 
`metadata.column_index().is_none()` after writing the file (and pick a writer 
config that actually produces that — `SerializedFileWriter` like the INT96 
tests, or `Optional` policy with the offset index absent), or skip the fixture 
entirely and unit-test `get_row_selection_for_filter_predicate` directly by 
handing it a `ParquetMetaData` with `column_index = None`. The direct unit test 
is the most honest proof the branch does what we claim. And if it turns out 
`Required` is the issue, the policy change is part of the fix. wdyt?



##########
crates/iceberg/src/arrow/reader/projection.rs:
##########
@@ -1725,4 +1725,108 @@ message schema {
             .collect();
         assert_eq!(ids, vec![2, 3]);
     }
+
+    /// Test that row_selection_enabled gracefully handles Parquet files that 
lack
+    /// column index metadata (common with older/migrated files). The reader 
should
+    /// fall back to returning all rows instead of erroring.
+    ///
+    /// Regression test for 
<https://github.com/apache/iceberg-rust/issues/2452>.
+    #[tokio::test]
+    async fn 
test_read_parquet_without_column_index_with_row_selection_enabled() {
+        use arrow_array::Int32Array;
+
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_io = FileIO::new_with_fs();
+
+        let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+        let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as 
ArrayRef;
+
+        let to_write =
+            RecordBatch::try_new(arrow_schema.clone(), vec![id_data, 
name_data]).unwrap();
+
+        // Write Parquet WITHOUT page index (column index / offset index) by 
disabling
+        // statistics at the page level. This simulates older Parquet files.
+        let props = WriterProperties::builder()
+            
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let file_path = format!("{table_location}/no_column_index.parquet");
+        let file = File::create(&file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, to_write.schema(), 
Some(props)).unwrap();
+        writer.write(&to_write).expect("Writing batch");
+        writer.close().unwrap();
+
+        // Use a predicate that would match some rows
+        let predicate = Reference::new("id").less_than(Datum::int(3));
+
+        let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
+            .with_row_group_filtering_enabled(true)
+            .with_row_selection_enabled(true)
+            .build();
+
+        let tasks = Box::pin(futures::stream::iter(
+            vec![Ok(FileScanTask {
+                file_size_in_bytes: 
std::fs::metadata(&file_path).unwrap().len(),
+                start: 0,
+                length: 0,
+                record_count: None,
+                data_file_path: file_path,
+                data_file_format: DataFileFormat::Parquet,
+                schema: schema.clone(),
+                project_field_ids: vec![1, 2],
+                predicate: Some(predicate.bind(schema, true).unwrap()),
+                deletes: vec![],
+                partition: None,
+                partition_spec: None,
+                name_mapping: None,
+                case_sensitive: false,
+            })]
+            .into_iter(),
+        )) as FileScanTaskStream;
+
+        // This should NOT error — page-level pruning is skipped when column 
index
+        // is absent, but the Arrow row filter still applies the predicate.
+        let result = reader
+            .read(tasks)
+            .unwrap()
+            .stream()
+            .try_collect::<Vec<RecordBatch>>()
+            .await

Review Comment:
   Every other test builds `FileScanTask` through 
`FileScanTask::builder().with_*(...).build()`. I'd match that here — the raw 
struct literal sidesteps any future builder-level validation and diverges from 
the convention in the rest of the module. Also worth fixing the comment above 
the writer props while we're here: `EnabledStatistics::None` disables *all* 
statistics, not just the page-level index (`EnabledStatistics::Chunk` keeps 
row-group stats and drops only the page index).



##########
crates/iceberg/src/arrow/reader/row_filter.rs:
##########
@@ -90,32 +89,33 @@ impl ArrowReader {
         Ok(results)
     }
 
+    /// Computes a [`RowSelection`] by evaluating the filter predicate against
+    /// the Parquet page index (column index + offset index).
+    ///
+    /// Returns `Ok(None)` when the Parquet file lacks column or offset index
+    /// metadata (common with older files written before page indexes became
+    /// standard). In that case page-level pruning is simply skipped; row-group
+    /// filtering and the Arrow row filter still apply the predicate.
     pub(super) fn get_row_selection_for_filter_predicate(
         predicate: &BoundPredicate,

Review Comment:
   I'd add a `tracing::debug!` at both `Ok(None)` sites noting which index was 
absent (and ideally the file path).
   
   Right now a writer that unexpectedly omits the page index — truncation, a 
bug, not just an old file — looks identical to the intended fallback, and 
there's no signal that page pruning was skipped. When someone reports a slow 
scan or a surprising row count, that log line is the only breadcrumb.



##########
crates/iceberg/src/arrow/reader/row_filter.rs:
##########
@@ -90,32 +89,33 @@ impl ArrowReader {
         Ok(results)
     }
 
+    /// Computes a [`RowSelection`] by evaluating the filter predicate against
+    /// the Parquet page index (column index + offset index).
+    ///
+    /// Returns `Ok(None)` when the Parquet file lacks column or offset index
+    /// metadata (common with older files written before page indexes became
+    /// standard). In that case page-level pruning is simply skipped; row-group
+    /// filtering and the Arrow row filter still apply the predicate.
     pub(super) fn get_row_selection_for_filter_predicate(
         predicate: &BoundPredicate,
         parquet_metadata: &Arc<ParquetMetaData>,
         selected_row_groups: &Option<Vec<usize>>,
         field_id_map: &HashMap<i32, usize>,
         snapshot_schema: &Schema,
-    ) -> Result<RowSelection> {
+    ) -> Result<Option<RowSelection>> {
         let Some(column_index) = parquet_metadata.column_index() else {
-            return Err(Error::new(
-                ErrorKind::Unexpected,
-                "Parquet file metadata does not contain a column index",
-            ));
+            return Ok(None);

Review Comment:
   The doc comment explains `None` well, but `Ok(Some(empty))` (all row groups 
filtered, select zero rows) and `Ok(None)` (no page index, skip pruning) are 
now meaningfully different states that look almost identical at the call site. 
I'd add a line to the doc noting the empty-`Some` case means "all row groups 
pruned, zero rows" so a future reader doesn't collapse the two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to