sdd commented on issue #630:
URL: https://github.com/apache/iceberg-rust/issues/630#issuecomment-2676912068

   I've worked on an improved design for loading and parsing of delete files by 
the `DeleteFileManager`. The code for this can be seen in 
https://github.com/apache/iceberg-rust/pull/982. 
   
   * Create a single stream of all delete file tasks irrespective of type, so 
that we can respect the combined concurrency limit
   * We then process each in two phases: **load** and **parse**.
   * for positional deletes, the load phase instantiates an 
`ArrowRecordBatchStream` to stream the file contents out
   * for eq deletes, we first check if the EQ delete is already loaded or being 
loaded by another concurrently processing data file scan task. If it is, we 
return a future from this phase. If not, we create such a future the same 
equality delete file, and return an `ArrowRecordBatchStream` from the load 
phase as per the positional deletes - only this time it is accompanied by a 
one-shot channel sender that we will eventually use to resolve the shared 
future that we stored in the state.
   * In a future update to the DeleteFileManager that adds support for delete 
vectors, the load phase will return a `PuffinReader` for them.
   * The parse phase parses each record batch stream according to its 
associated data type. The result of this is a map of data file paths to delete 
vectors for the positional delete tasks (and in future for the delete vector 
tasks). For equality delete file tasks, this results in an unbound `Predicate`.
   * The unbound `Predicate`s resulting from equality deletes are sent to their 
associated one-shot channel to store them in the right place in the delete file 
manager's state.
   * The results of all of these futures are awaited on in parallel with the 
specified level of concurrency and collected into a `Vec`. We then combine all 
of the delete vector maps that resulted from any positional delete or delete 
vector files into a single map and persist it in the state.
              Conceptually, the data flow is like this:
   
   ```
                                            FileScanTaskDeleteFile
                                                       |
              Already-loading EQ Delete                |             Everything 
Else
                       +---------------------------------------------------+
                       |                                                   |
              [get existing future]                         [load recordbatch 
stream / puffin]
           DeleteFileContext::InProgEqDel                           
DeleteFileContext
                       |                                                   |
                       |                                                   |
                       |                     
+-----------------------------+--------------------------+
                       |                   Pos Del           Del Vec (Not yet 
Implemented)         EQ Del
                       |                     |                             |    
                      |
                       |          [parse pos del stream]         [parse del vec 
puffin]       [parse eq del]
                       |  HashMap<String, RoaringTreeMap> HashMap<String, 
RoaringTreeMap>   (Predicate, Sender)
                       |                     |                             |    
                      |
                       |                     |                             |    
             [persist to state]
                       |                     |                             |    
                      ()
                       |                     |                             |    
                      |
                       |                     
+-----------------------------+--------------------------+
                       |                                                   |
                       |                                            [buffer 
unordered]
                       |                                                   |
                       |                                            [combine 
del vectors]
                       |                                      HashMap<String, 
RoaringTreeMap>
                       |                                                   |
                       |                                      [persist del 
vectors to state]
                       |                                                   ()
                       |                                                   |
                       +-------------------------+-------------------------+
                                                 |
                                              [join!]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to