pvary commented on PR #10935: URL: https://github.com/apache/iceberg/pull/10935#issuecomment-2294754336
@wypoon, @manuzhang: I'm interested in providing Flink streaming CDC reads. That would require a working changelog scan planning in Iceberg core. So I would be happy to help with my reviews here. I'm not an expert of this part of the code yet, so if the PR becomes more complicated, we might have to ask help from folks more experienced here, but I can do a first round of the reviews. I see that there are multiple PRs for this topic (#9888, #10954). I would suggest to focus the discussion on one specific PR. There are some tricky scenarios with Flink CDC, we need to handle: ----------------------- Scenario1: - 1st snapshot: Writing a new record to the table primary key 1 (PK1), and value 1 (V1) - Creates data file 1 (DF1) - with PK1-V1 - 2nd snapshot: The record is updated with PK1 with value V2 - Creates an equality delete 1 (ED1) - with PK1 - Creates a new data file 2 (DF2) - with PK1-V2 - 3rd snapshot: The record is updated with PK1 with value V3 - Creates an equality delete 2 (ED2) - with PK1 - Creates a new data file 3 (DF3) - with PK1-V3 When the changelog scan reads the 3rd snapshot, it should consider: - Delete reads: - DF1 - omit records deleted by ED1, emit records deleted by ED2 - DF2 - emit records deleted by ED2 - Data read: - DF3 - emit all records Notice that for DF1 we should not emit records which are deleted by previous deletes. So I am concerned about your `// not used` comment [here](https://github.com/apache/iceberg/pull/10935/files#diff-24a8b4b8a16e20a7727ca9b04754fa00cbb40228aa79b3e63baf958aef7bec04R266) 😄 ----------------------- Scenario2: - 1st snapshot: Writing a new record to the table primary key 1 (PK1), and value 1 (V1) - Creates data file 1 (DF1) - with PK1-V1 - 2nd snapshot: The record is updated with PK1 with value V2, but later updated again with PK1 with value V3 - Creates an equality delete 1 (ED1) - with PK1 - Creates a new data file 2 (DF2) - with PK1-V2 - Creates a positional delete 1 (PS1) - for the DF2-PK1 - Here we have 2 possibilities: - Adds a new line to DF2 with PK1-V3 - if the data file target file size is not reached yet - Creates a new data file 3 (DF3) - with PK1-V3 - if the data file is already rolled over When changelog scan the 2nd snapshot, it should consider: - Delete reads: - DF1 - emit records deleted by ED1 - the emitted record is: D(PK1-V1) - DF2 - emit records deleted by PD1 - the emitted record is: D(PK1-V2) - Data read: - DF2 - omit records deleted by PS1 - the emitted record is: I(PK1-V2) - DF3 - emit all records - the emitted record is: I(PK1-V3) Notice that the order of the records is important, and not trivial to order if the files are read in the distributed way. CC: @dramaticlly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org