luoyuxia opened a new issue, #2979: URL: https://github.com/apache/fluss/issues/2979
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version main (development) ### Please describe the bug 🐞 When reading a primary-key Paimon lake table through the sorted reader path, Fluss can fail if the primary key contains a `TIMESTAMP` column. I hit this while reading a table with a composite primary key like: ```sql PRIMARY KEY (member_id, channel_key, seq_time, order_id) NOT ENFORCED ``` and a bucket key like: ```sql 'bucket.key' = 'member_id,channel_key,seq_time' ``` The reader fails with: ```text java.lang.UnsupportedOperationException: Unsupported data type to get timestamp: STRING NOT NULL at org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow.getTimestamp(FlussRowAsPaimonRow.java:145) at RecordComparator$63.compare(Unknown Source) at org.apache.paimon.codegen.RecordComparator.compare(RecordComparator.java:30) at org.apache.fluss.lake.paimon.source.PaimonSortedRecordReader.lambda$toFlussRowComparator$0(PaimonSortedRecordReader.java:63) at java.util.TreeMap.compare(TreeMap.java:1292) at java.util.TreeMap.put(TreeMap.java:536) at org.apache.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner.pollLogRecords(LakeSnapshotAndLogSplitScanner.java:231) ``` Root cause: - `LakeSnapshotAndLogSplitScanner` stores log-side keys as `ProjectedRow` primary-key rows. - `PaimonSortedRecordReader` builds the comparator with a primary-key comparator, but adapts Fluss rows using the full table `RowType` instead of the primary-key `RowType`. - Once the comparator reaches the projected `seq_time` field, it resolves the type by position against the full table schema. In affected schemas that position can be a non-timestamp field such as `STRING`, which triggers the exception above. This is reproducible whenever all of the following are true: 1. the read path uses the sorted Paimon PK reader; 2. the compared row is a projected PK row; 3. the primary key contains a timestamp field; and 4. the timestamp field's projected position does not match a timestamp field at the same position in the full row schema. Expected behavior: - PK rows should be compared using the primary-key schema, and composite primary keys containing timestamp fields should read successfully. Actual behavior: - The comparator interprets projected PK columns with the wrong schema and fails during timestamp comparison. Additional context: - I observed this from a VVR connector build based on Flink 1.20 (`ververica-connector-fluss-1.20-vvr-11.5.0-2-jdk11`), but the underlying issue is in the Fluss Paimon sorted reader logic on `main`. ### Solution Use the primary-key `RowType` when adapting Fluss PK rows for the Paimon key comparator in `PaimonSortedRecordReader`, instead of the full table row type. A regression test can cover a composite PK containing `TIMESTAMP`. ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
