pvary commented on PR #10935:
URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2294754336

   @wypoon, @manuzhang: I'm interested in providing Flink streaming CDC reads. 
That would require a working changelog scan planning in Iceberg core. So I 
would be happy to help with my reviews here. I'm not an expert of this part of 
the code yet, so if the PR becomes more complicated, we might have to ask help 
from folks more experienced here, but I can do a first round of the reviews.
   
   I see that there are multiple PRs for this topic (#9888, #10954). I would 
suggest to focus the discussion on one specific PR.
   
   There are some tricky scenarios with Flink CDC, we need to handle:
   
   -----------------------
   
   Scenario1:
   - 1st snapshot: Writing a new record to the table primary key 1 (PK1), and 
value 1 (V1)
       - Creates data file 1  (DF1) - with PK1-V1
   - 2nd snapshot: The record is updated with PK1 with value V2
       - Creates an equality delete 1 (ED1) - with PK1
       - Creates a new data file 2 (DF2) - with PK1-V2
   - 3rd snapshot:  The record is updated with PK1 with value V3
       - Creates an equality delete 2 (ED2) - with PK1
       - Creates a new data file 3 (DF3) - with PK1-V3
   
   When the changelog scan reads the 3rd snapshot, it should consider:
   - Delete reads:
       - DF1 - omit records deleted by ED1, emit records deleted by ED2
       - DF2 - emit records deleted by ED2
   - Data read:
       - DF3 - emit all records
   
   Notice that for DF1 we should not emit records which are deleted by previous 
deletes. So I am concerned about your `// not used` comment 
[here](https://github.com/apache/iceberg/pull/10935/files#diff-24a8b4b8a16e20a7727ca9b04754fa00cbb40228aa79b3e63baf958aef7bec04R266)
 😄
   
   -----------------------
   
   Scenario2:
   - 1st snapshot: Writing a new record to the table primary key 1 (PK1), and 
value 1 (V1)
       - Creates data file 1  (DF1) - with PK1-V1
   - 2nd snapshot: The record is updated with PK1 with value V2, but later 
updated again with PK1 with value V3
       - Creates an equality delete 1 (ED1) - with PK1
       - Creates a new data file 2 (DF2) - with PK1-V2
       - Creates a positional delete 1 (PS1) - for the DF2-PK1
       - Here we have 2 possibilities:
            - Adds a new line to DF2 with PK1-V3 - if the data file target file 
size is not reached yet
            - Creates a new data file 3 (DF3)  - with PK1-V3 - if the data file 
is already rolled over
   
   When changelog scan the 2nd snapshot, it should consider:
   - Delete reads:
       - DF1 - emit records deleted by ED1 - the emitted record is: D(PK1-V1)
       - DF2 - emit records deleted by PD1 - the emitted record is: D(PK1-V2)
   - Data read:
       - DF2 - omit records deleted by PS1 - the emitted record is: I(PK1-V2)
       - DF3 - emit all records - the emitted record is: I(PK1-V3)
   
   Notice that the order of the records is important, and not trivial to order 
if the files are read in the distributed way.
   
   CC: @dramaticlly 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to