sdd commented on issue #630:
URL: https://github.com/apache/iceberg-rust/issues/630#issuecomment-2644037450

   Hi all. I'm resurrecting this issue now that @Fokko has kindly helped get 
the first part of this over the line by reviewing and merging 
https://github.com/apache/iceberg-rust/pull/652.
   
   I have a branch with an earlier iteration of delete file read support that 
I'm intending to break up into pieces and submit as separate PRs. There are 
parts of it that I'm happy with and other parts that I'm less happy with; plus 
now would be a good opportunity to discuss the higher-level structure of the 
approach for this again now that I've got a better idea of the different parts 
of work that are involved.
   
   ## Outline
   
   ### `TableScan`
   
   * `TableScan::plan_files` creates a `DeleteFileIndex` which accumulates an 
index of all of the equality delete files and positional delete files that 
could apply to data files within the scan.   * Each of the `FileScanTask`s in 
the stream returned by `plan_files` is augmented with `delete_files`, a 
`Vec<FileScanTaskDeleteFile>` which contains details of the delete files that 
could apply to the data file in the `FileScanTask`.
   
   ### `ArrowReader`
   
   Right now, `ArrowReader::read` takes a stream of `FileScanTask`s, and for 
each task, it reads record batches from that task's data file - applies 
filtering, projection, batching and transformations such as schema migration / 
column reordering - returning a stream of the `RecordBatch`es resulting from 
applying these operations to all file scan tasks.
   
   In order to support delete files the filtering logic needs to be extended so 
that:
   
   * Equality delete files that are referenced in each task's `delete_files` 
list are used to filter out rows that match filter predicates in any applicable 
delete files;
   * Positional delete files from `delete_files` are used to filter out rows 
whose index is specified as deleted in applicable delete files.
   
   ### Filtering Approach
   
   * Predicate based filtering is already being performed in the reader, by 
transforming the filter predicate present in the `FileScanTask` into an arrow 
`RowFilter` that gets passed to the 
`ParquetRecordBatchStreamBuilder::with_row_filter`. 
   * Equality deletes can be handled by merging the file scan task's filter 
predicate with predicates built from applicable rows present in any equality 
delete files to form a single larger predicate that gets passed to 
`ParquetRecordBatchStreamBuilder` as before.
   * Any positional deletes can be handled by using the `RowSelection` logic - 
creating a `RowSelection` specific to the applicable positional deletes, and 
merging this with any `RowSelection` created as part of the row selection 
filter generated from the filter predicate, if present, and passed to 
`ParquetRecordBatchStreamBuilder::with_row_selection`.
   
   ### Loading and Processing Delete Files
   
   Whilst I was fairly happy with the approach taken to filtering in my first 
draft, the approach taken to loading and processing delete files felt like it 
could be improved.
   
   The first draft took this approach:
   
   ```rust
   // delete_file.rs
   
   // Represents a parsed Delete file that can be safely stored
   // in the Object Cache.
   // Storing this in the object cache saves us from parsing
   // the same file multiple times in the case of delete files that could apply 
to
   // multiple files within the stream (It would be better if we also
   // have a way of preventing more than one task from starting to parse the 
same
   // delete file in parallel)
   pub(crate) enum Deletes {
       // Positional delete files are parsed into a map of
       // filename to a sorted list of row indices.
       // ( I'm ignoring the stored rows that are present in
       //   positional deletes for now. I think they only used for statistics?)
       Positional(HashMap<String, Vec<u64>>),
   
       // Equality delete files are initially parsed solely as an
       // unprocessed list of `RecordBatch`es from the equality
       // delete files.
       // Can we do better than this by storing a `Predicate`?
       // The equality deletes refer to fields by name rather than field_id,
       // so if we cache a Predicate rather than just a raw `RecordBatch`
       // then a field name change would invalidate the cached Predicate.
       // Similarly, I don't think we can cache these as `BoundPredicate`s
       // as the column order could be different across different data
       // files and so the accessor in the bound predicate could be invalid)?
       Equality(Vec<RecordBatch>),
   }
   
   // Processes the RecordBatches of a pos del file into a `HashMap` that
   // maps filenames to lists of row indices that are marked as 
   // deleted for that file
   pub(crate) async fn parse_positional_delete_file(
       mut record_batch_stream: ArrowRecordBatchStream,
   ) -> Result<Deletes> {
      // ...
   }
   
   // Simpler - collects the RecordBatches of an eq del file
   // into a `Vec<RecordBatch>`
   // and returns them in a `Deletes::Equality`
   pub(crate) async fn parse_equality_delete_file(
       mut record_batch_stream: ArrowRecordBatchStream,
   ) -> Result<Deletes> {
     // ...
   }
   
   ```
   
   ```rust
   // arrow/reader.rs
   impl ArrowReader {
   
       // ... rest of ArrowReader impl
   
       // Spawned at the start of `process_file_scan_task` and then awaited on
       // after the record_batch_stream_builder has been created and the page
       // index loaded.
       // Responsible for loading the delete files through FileIO and parsing
       // them into a list of `Deletes` objects
       async fn get_deletes(
           delete_file_entries: Vec<FileScanTaskDeleteFile>,
           file_io: FileIO,
           concurrency_limit_data_files: usize,
       ) -> Result<Vec<Deletes>> {
           // concurrently physically loads delete files for each
           // `FileScanTaskDeleteFile` via `FileIO` into a 
           // `RecordBatchStream`, which gets mapped through 
           // `parse_positional_delete_file` or `parse_equality_delete_file`
           // into a combined vec of `Deletes`
       }
   
       // Maps `Deletes::Positional` objects into a list
       // of indices of rows that need deleting
       // from the data file with the specified path
       fn get_positional_delete_indexes(
           data_file_path: &str,
           deletes: &[Deletes],
       ) -> Option<Vec<usize>> {
           // trivial filter / map
       }
   
       // Maps applicable `Deletes::Equality` objects into
       // an Iceberg predicate that is bound to the provided snapshot
       fn get_equality_deletes(
           delete_files: &[Deletes],
           snapshot_schema: SchemaRef,
       ) -> Result<Option<BoundPredicate>> {
           // starts with `AlwaysFalse` and builds up a predicate
           // by `AND`ing together predicates derived from applicable
           // rows in the RecordBatches
       }
   }
   
   ```
   
   @liurenjie1024 commented that this approach could be improved by reusing the 
logic present in the data file reader rather than reimplementing similar logic. 
He also mentioned that inspiration could be taken from the Java 
implementation's GenericReader.
   
   Suggestions that improve upon the approach to loading and caching the delete 
files above are welcome. I'll refrain from submitting any changes related to 
this until others have had chance to comment. I'll proceed with the more 
well-defined aspects relating to orchestrating the filtering logic itself.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to