luoyuxia opened a new issue, #2879: URL: https://github.com/apache/fluss/issues/2879
## Search before asking - [x] I searched in the issues and found nothing similar. ## Fluss version main (development) ## Please describe the bug 🐞 When reading a `$binlog` table, the Flink source can fail with the following exception if one `SourceReader` processes records from multiple log splits and update events are interleaved across splits: ```text Cause: Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record. This indicates a corrupted log sequence. ``` The root cause is that `BinlogRowConverter` keeps only one global pending `UPDATE_BEFORE` record. This works if records are strictly processed from a single split, but it breaks once the same reader interleaves records from different bucket log splits. A problematic sequence looks like this: 1. split A emits `UPDATE_BEFORE` 2. split B emits `UPDATE_BEFORE` 3. split B emits `UPDATE_AFTER` 4. split A emits `UPDATE_AFTER` With a single global pending buffer: - split B's `UPDATE_BEFORE` overwrites split A's pending state; - split B's `UPDATE_AFTER` consumes that global state; - split A's `UPDATE_AFTER` no longer finds its matching `UPDATE_BEFORE` and throws the exception above. Depending on the exact interleaving, this can also temporarily pair the wrong before/after rows across splits before eventually failing. This is especially relevant for `$binlog` on primary key tables with multiple buckets, where one reader may own multiple log splits at the same time. ### Expected behavior `UPDATE_BEFORE` / `UPDATE_AFTER` pairing should be isolated per split, so interleaving records from different splits does not corrupt update reconstruction. ### Actual behavior The pairing state is shared across splits, which can corrupt update reconstruction and fail the source task with: ```text Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record. ``` ### How to reproduce A minimal reproduction is: 1. Create a primary key table with multiple buckets. 2. Read from the corresponding `$binlog` table with the Flink source. 3. Make one reader consume multiple log splits. 4. Produce interleaved updates from different buckets/splits in the order `A:-U -> B:-U -> B:+U -> A:+U`. The reader can then fail because the pending `UPDATE_BEFORE` state is overwritten by another split. ## Solution Use split-scoped buffering for update pairing instead of a single global pending record. Concretely: 1. change `BinlogRowConverter` from a single pending `UPDATE_BEFORE` field to a `Map<splitId, LogRecord>`; 2. pass the current split id through the deserialization path before converting a log record; 3. merge `UPDATE_BEFORE` / `UPDATE_AFTER` only within the same split; 4. add regression tests for interleaved cross-split update sequences, ideally both at the converter level and through the emitter/deserializer path. This keeps `$binlog` update reconstruction correct even when multiple splits are processed by the same source reader. ## Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
