advancedxy commented on code in PR #2615:
URL: https://github.com/apache/iceberg-rust/pull/2615#discussion_r3393866757


##########
crates/iceberg/src/arrow/reader/row_filter.rs:
##########
@@ -604,4 +612,102 @@ mod tests {
             assert_eq!(first_val, 100, "Task 2 should start with id=100, not 
id=0");
         }
     }
+
+    /// A single data file split into multiple sub-row-group byte ranges (as 
Spark/Iceberg
+    /// planning produces when split-size is smaller than a row group) must 
still yield each
+    /// row exactly once. The previous overlap-based selection let every split 
whose byte range
+    /// touched a row group read it, duplicating rows; ownership by midpoint 
reads each row group
+    /// from exactly one split.
+    #[tokio::test]
+    async fn test_sub_row_group_splits_do_not_duplicate_rows() {
+        use arrow_array::Int32Array;
+
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                ])
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+        ]));
+
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().to_str().unwrap().to_string();
+        let file_path = format!("{table_location}/sub_split.parquet");
+
+        // Three row groups of 100 rows each (ids 0..300).
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .set_max_row_group_row_count(Some(100))
+            .build();
+        let file = File::create(&file_path).unwrap();
+        let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), 
Some(props)).unwrap();
+        for chunk in [0..100, 100..200, 200..300] {
+            let batch = RecordBatch::try_new(arrow_schema.clone(), 
vec![Arc::new(
+                Int32Array::from(chunk.collect::<Vec<i32>>()),
+            )])
+            .unwrap();
+            writer.write(&batch).expect("Writing batch");
+        }
+        writer.close().unwrap();
+
+        let file_size = std::fs::metadata(&file_path).unwrap().len();
+        let file_io = FileIO::new_with_fs();
+        let reader = ArrowReaderBuilder::new(file_io, 
Runtime::current()).build();
+
+        // Tile the whole file into 64-byte splits, mirroring Spark's 
split-size planning, and
+        // read every split. A 64-byte split is far smaller than a row group, 
so each row group
+        // is touched by several splits but must be owned (read) by exactly 
one.
+        let mut ids = Vec::new();
+        let split_size = 64u64;
+        let mut start = 0u64;
+        while start < file_size {
+            let length = split_size.min(file_size - start);
+            let task = FileScanTask::builder()
+                .with_file_size_in_bytes(file_size)
+                .with_start(start)

Review Comment:
   it might be helpful to test cases where the start of file scan task being 
exactly of the mid of one row group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to