LiebingYu opened a new issue, #2971: URL: https://github.com/apache/fluss/issues/2971
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Description Currently, `FlinkTableSource` does not implement the `SupportsWatermarkPushDown` interface. This causes two significant problems when a Fluss table has a `WATERMARK` definition in Flink SQL. When `SupportsWatermarkPushDown` is **not** implemented, Flink's planner inserts a standalone `WatermarkAssigner` node between the table scan and downstream operators. The Flink optimization rule `PushWatermarkIntoTableSourceScanRule` only fires when the source implements `SupportsWatermarkPushDown`. Without it, the rule never fires and the `WatermarkAssigner` node remains as a separate operator in the query plan. This breaks partition filter pushdown: Flink's `PushFilterIntoTableSourceScanRule` cannot push filter predicates past the `WatermarkAssigner` node into the source scan. As a result, a query like: ```sql SELECT a, b, c FROM partitioned_table WHERE c = '2025' ``` produces a plan where the partition filter is **not** pushed into the `TableSourceScan`: ``` -- Without SupportsWatermarkPushDown (current behavior): Filter(condition=[=(c, '2025')]) WatermarkAssigner(...) TableSourceScan(...) -- no filter, no partition pruning ``` ``` -- With SupportsWatermarkPushDown (expected behavior): TableSourceScan( watermark=[-(ts, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], filter=[=(c, '2025')] -- partition pruning applied ) ``` ### Willingness to contribute - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
