sdd commented on issue #630: URL: https://github.com/apache/iceberg-rust/issues/630#issuecomment-2676912068
I've worked on an improved design for loading and parsing of delete files by the `DeleteFileManager`. The code for this can be seen in https://github.com/apache/iceberg-rust/pull/982. * Create a single stream of all delete file tasks irrespective of type, so that we can respect the combined concurrency limit * We then process each in two phases: **load** and **parse**. * for positional deletes, the load phase instantiates an `ArrowRecordBatchStream` to stream the file contents out * for eq deletes, we first check if the EQ delete is already loaded or being loaded by another concurrently processing data file scan task. If it is, we return a future from this phase. If not, we create such a future the same equality delete file, and return an `ArrowRecordBatchStream` from the load phase as per the positional deletes - only this time it is accompanied by a one-shot channel sender that we will eventually use to resolve the shared future that we stored in the state. * In a future update to the DeleteFileManager that adds support for delete vectors, the load phase will return a `PuffinReader` for them. * The parse phase parses each record batch stream according to its associated data type. The result of this is a map of data file paths to delete vectors for the positional delete tasks (and in future for the delete vector tasks). For equality delete file tasks, this results in an unbound `Predicate`. * The unbound `Predicate`s resulting from equality deletes are sent to their associated one-shot channel to store them in the right place in the delete file manager's state. * The results of all of these futures are awaited on in parallel with the specified level of concurrency and collected into a `Vec`. We then combine all of the delete vector maps that resulted from any positional delete or delete vector files into a single map and persist it in the state. Conceptually, the data flow is like this: ``` FileScanTaskDeleteFile | Already-loading EQ Delete | Everything Else +---------------------------------------------------+ | | [get existing future] [load recordbatch stream / puffin] DeleteFileContext::InProgEqDel DeleteFileContext | | | | | +-----------------------------+--------------------------+ | Pos Del Del Vec (Not yet Implemented) EQ Del | | | | | [parse pos del stream] [parse del vec puffin] [parse eq del] | HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender) | | | | | | | [persist to state] | | | () | | | | | +-----------------------------+--------------------------+ | | | [buffer unordered] | | | [combine del vectors] | HashMap<String, RoaringTreeMap> | | | [persist del vectors to state] | () | | +-------------------------+-------------------------+ | [join!] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org