liurenjie1024 commented on code in PR #1017: URL: https://github.com/apache/iceberg-rust/pull/1017#discussion_r2177176342
########## crates/iceberg/src/arrow/caching_delete_file_loader.rs: ########## @@ -308,28 +319,231 @@ impl CachingDeleteFileLoader { Ok(result) } - /// Parses record batch streams from individual equality delete files - /// - /// Returns an unbound Predicate for each batch stream async fn parse_equality_deletes_record_batch_stream( - streams: ArrowRecordBatchStream, + mut stream: ArrowRecordBatchStream, + equality_ids: HashSet<i32>, ) -> Result<Predicate> { - // TODO + let mut result_predicate = AlwaysTrue; + + while let Some(record_batch) = stream.next().await { + let record_batch = record_batch?; + + if record_batch.num_columns() == 0 { + return Ok(AlwaysTrue); + } + + let batch_schema_arrow = record_batch.schema(); + let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?; + + let mut datum_columns_with_names: Vec<_> = record_batch + .columns() + .iter() + .zip(batch_schema_iceberg.as_struct().fields()) + // only use columns that are in the set of equality_ids for this delete file + .filter(|(field, value)| equality_ids.contains(&value.id)) + .map(|(column, field)| { + let col_as_datum_vec = arrow_array_to_datum_iterator(column, field); + col_as_datum_vec.map(|c| (c, field.name.to_string())) + }) + .try_collect()?; + + // consume all the iterators in lockstep, creating per-row predicates that get combined + // into a single final predicate + + // (2025-06-12) can't use `is_empty` as it depends on unstable library feature `exact_size_is_empty` + #[allow(clippy::len_zero)] + while datum_columns_with_names[0].0.len() > 0 { + let mut row_predicate = AlwaysTrue; + for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names { + if let Some(item) = column.next() { + if let Some(datum) = item? { + row_predicate = row_predicate + .and(Reference::new(field_name.clone()).equal_to(datum.clone())); + } + } + } + result_predicate = result_predicate.and(row_predicate.not()); Review Comment: Are you trying to make this as `not(!row_predicate1 and !row_predicate2)`? This is a little odd to read, why not just `row_predicate1 or row_predicate2`? Also the last line should be `negative` rather `rewrite_not` ########## crates/iceberg/src/arrow/caching_delete_file_loader.rs: ########## @@ -308,28 +319,231 @@ impl CachingDeleteFileLoader { Ok(result) } - /// Parses record batch streams from individual equality delete files - /// - /// Returns an unbound Predicate for each batch stream async fn parse_equality_deletes_record_batch_stream( - streams: ArrowRecordBatchStream, + mut stream: ArrowRecordBatchStream, + equality_ids: HashSet<i32>, ) -> Result<Predicate> { - // TODO + let mut result_predicate = AlwaysTrue; + + while let Some(record_batch) = stream.next().await { + let record_batch = record_batch?; + + if record_batch.num_columns() == 0 { + return Ok(AlwaysTrue); + } + + let batch_schema_arrow = record_batch.schema(); + let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?; + + let mut datum_columns_with_names: Vec<_> = record_batch + .columns() + .iter() + .zip(batch_schema_iceberg.as_struct().fields()) + // only use columns that are in the set of equality_ids for this delete file + .filter(|(field, value)| equality_ids.contains(&value.id)) Review Comment: This is incorrect, see https://iceberg.apache.org/spec/#equality-delete-files:~:text=Each%20row%20of%20the%20delete%20file%20produces%20one%20equality%20predicate%20that%20matches%20any%20row%20where%20the%20delete%20columns%20are%20equal.%20Multiple%20columns%20can%20be%20thought%20of%20as%20an%20AND%20of%20equality%20predicates.%20A%20null%20value%20in%20a%20delete%20column%20matches%20a%20row%20if%20the%20row%27s%20value%20is%20null%2C%20equivalent%20to%20col%20IS%20NULL. If a column is missing, we should treat it as null, and it still helps to prune null value. ########## crates/iceberg/src/arrow/caching_delete_file_loader.rs: ########## @@ -308,28 +319,231 @@ impl CachingDeleteFileLoader { Ok(result) } - /// Parses record batch streams from individual equality delete files - /// - /// Returns an unbound Predicate for each batch stream async fn parse_equality_deletes_record_batch_stream( - streams: ArrowRecordBatchStream, + mut stream: ArrowRecordBatchStream, + equality_ids: HashSet<i32>, ) -> Result<Predicate> { - // TODO + let mut result_predicate = AlwaysTrue; + + while let Some(record_batch) = stream.next().await { + let record_batch = record_batch?; + + if record_batch.num_columns() == 0 { + return Ok(AlwaysTrue); + } + + let batch_schema_arrow = record_batch.schema(); + let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?; + + let mut datum_columns_with_names: Vec<_> = record_batch + .columns() + .iter() + .zip(batch_schema_iceberg.as_struct().fields()) + // only use columns that are in the set of equality_ids for this delete file + .filter(|(field, value)| equality_ids.contains(&value.id)) + .map(|(column, field)| { + let col_as_datum_vec = arrow_array_to_datum_iterator(column, field); + col_as_datum_vec.map(|c| (c, field.name.to_string())) + }) + .try_collect()?; + + // consume all the iterators in lockstep, creating per-row predicates that get combined + // into a single final predicate + + // (2025-06-12) can't use `is_empty` as it depends on unstable library feature `exact_size_is_empty` + #[allow(clippy::len_zero)] + while datum_columns_with_names[0].0.len() > 0 { + let mut row_predicate = AlwaysTrue; + for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names { + if let Some(item) = column.next() { + if let Some(datum) = item? { + row_predicate = row_predicate + .and(Reference::new(field_name.clone()).equal_to(datum.clone())); + } + } + } + result_predicate = result_predicate.and(row_predicate.not()); + } + } + Ok(result_predicate.rewrite_not()) + } +} + +macro_rules! prim_to_datum { Review Comment: I think there exists similar things in other parts? We should reuse them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org