tharvey5 opened a new pull request, #13390: URL: https://github.com/apache/iceberg/pull/13390
## Summary Currently, Apache Flink's does not support storage partition join, which can lead to unnecessary data shuffles in batch mode. We have implemented the Query Planner changes in Flink already here https://github.com/apache/flink/pull/26715 This feature IS ONLY APPLIED via a config `table.optimizer.storage-partition-join-enabled=true` o/w there is no impact to current jobs. This PR only supports batch execution mode. This PR consists of relevant changes for the Flink Iceberg Source. Please note that these changes are ONLY included for FLIP27 (new Source API). **NOTE: We migrated to usnig FLIP27 and have included that backport in this PR Backport: https://github.com/apache/iceberg/pull/10832** This PR adds the following support. - https://github.com/apache/iceberg/pull/10832 for Iceberg 1.5.x - Enhances `IcebergTableSource` to implement `SupportsPartitioning` interface which we defined on the [flink side](https://github.com/apache/flink/pull/26715) which enables Iceberg to report Partitioning metadata to the Flink Query planner. Done via `outputPartitioning()` returning `KeyGroupedPartitioning` with table’s partition scheme. It can support various transform types including bucket, identity, month, day, year. - Improvements to `IcebergSource` to support StoragePartitionJoin - Enhances `FlinkSplitPlanner` to include a method to group ScanTasks by groupingKey (Partition Values) which enables us to ensure that all records within the same partition end up being processed by the same subtask. - PartitionAwareSplitAssignment capabilities including a new `PartitionAwareSplitAssignerFactory` and `PartitionAwareSplitAssigner` which is responsible for ensuring that records with the same partition are assigned to the same subtask via deterministic assignment - Includes a new`SpecTransformToFlinkTransform` to map the various TransformExpressions used to represent the partitions to the Flink System ## Testing * Added Unit tests to `TestPartitionAwareSplitAssigner` to verify that splits were deterministically applied to the correct subtasks * Added Unit Tests `TestFlinkSplitPlanner` to test improved functionality to get batchSplits based on `ScanGroup` * Added Unit test `TestStoragePartitionedJoin` to verify that we correctly ensure that we get the correct metadata * Run the following Queries internally and compare to Spark output **Correctly use SPJ** ``` select count(*) from iceberg.db.simple_bucketed as table1 join iceberg.db.simple_bucketed_2 as table2 on table1.user_id = table2.user_id2 ``` ``` select count(*) from iceberg.db.user_id_two_partition_cols t1 join iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = t2.user_id ``` ``` select count(*) from iceberg.db.user_id_two_partition_cols t1 join iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = t2.user_id where t1.dt = '2025-05-01' and t2.dt = '2025-05-01' ``` **Correctly cannot use SPJ** ``` select count(*) from iceberg.db.simple_bucketed as table1 join iceberg.db.simple_bucketed_32 as table2 on table1.user_id = table2.user_id2 ``` ``` select count(*) from iceberg.db.user_id_two_partition_cols t1 join iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = t2.user_id where t1.dt = '2025-05-01' and t2.dt = '2025-05-01' and t1.user_id =123 and t2.user_id =123 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org