sdd commented on code in PR #1017:
URL: https://github.com/apache/iceberg-rust/pull/1017#discussion_r2180856664


##########
crates/iceberg/src/arrow/caching_delete_file_loader.rs:
##########
@@ -308,28 +319,231 @@ impl CachingDeleteFileLoader {
         Ok(result)
     }
 
-    /// Parses record batch streams from individual equality delete files
-    ///
-    /// Returns an unbound Predicate for each batch stream
     async fn parse_equality_deletes_record_batch_stream(
-        streams: ArrowRecordBatchStream,
+        mut stream: ArrowRecordBatchStream,
+        equality_ids: HashSet<i32>,
     ) -> Result<Predicate> {
-        // TODO
+        let mut result_predicate = AlwaysTrue;
+
+        while let Some(record_batch) = stream.next().await {
+            let record_batch = record_batch?;
+
+            if record_batch.num_columns() == 0 {
+                return Ok(AlwaysTrue);
+            }
+
+            let batch_schema_arrow = record_batch.schema();
+            let batch_schema_iceberg = 
arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
+
+            let mut datum_columns_with_names: Vec<_> = record_batch
+                .columns()
+                .iter()
+                .zip(batch_schema_iceberg.as_struct().fields())
+                // only use columns that are in the set of equality_ids for 
this delete file
+                .filter(|(field, value)| equality_ids.contains(&value.id))
+                .map(|(column, field)| {
+                    let col_as_datum_vec = 
arrow_array_to_datum_iterator(column, field);
+                    col_as_datum_vec.map(|c| (c, field.name.to_string()))
+                })
+                .try_collect()?;
+
+            // consume all the iterators in lockstep, creating per-row 
predicates that get combined
+            // into a single final predicate
+
+            // (2025-06-12) can't use `is_empty` as it depends on unstable 
library feature `exact_size_is_empty`
+            #[allow(clippy::len_zero)]
+            while datum_columns_with_names[0].0.len() > 0 {
+                let mut row_predicate = AlwaysTrue;
+                for &mut (ref mut column, ref field_name) in &mut 
datum_columns_with_names {
+                    if let Some(item) = column.next() {
+                        if let Some(datum) = item? {
+                            row_predicate = row_predicate
+                                
.and(Reference::new(field_name.clone()).equal_to(datum.clone()));
+                        }
+                    }
+                }
+                result_predicate = result_predicate.and(row_predicate.not());
+            }
+        }
+        Ok(result_predicate.rewrite_not())
+    }
+}
+
+macro_rules! prim_to_datum {

Review Comment:
   It's not really possible to refactor `arrow_struct_to_literal` to return 
`Datum` since `Datum` is only for `PrimitiveLiteral` rather than the more 
general `Literal`, and `arrow_struct_to_literal` handles composite types as 
well as primitives.
   
   Instead, I've managed to re-use the same visitor that 
`arrow_struct_to_literal` uses, but wrapped in a small 
`arrow_primitive_to_literal` function instead, allowing me to remove the 
similar code from caching_delete_file_loader.rs, at the expense of some 
slightly uglier glue code in `parse_equality_deletes_record_batch_stream`.
   
   Does this address your concern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to