tharvey5 opened a new pull request, #13390:
URL: https://github.com/apache/iceberg/pull/13390

   ## Summary
   Currently, Apache Flink's does not support storage partition join, which can 
lead to unnecessary data shuffles in batch mode. We have implemented the Query 
Planner changes in Flink already here https://github.com/apache/flink/pull/26715
   
   This feature IS ONLY APPLIED via a config 
`table.optimizer.storage-partition-join-enabled=true` o/w there is no impact to 
current jobs. This PR only supports batch execution mode.
   
   This PR consists of relevant changes for the Flink Iceberg Source. Please 
note that these changes are ONLY included for FLIP27 (new Source API).
   
   **NOTE: We migrated to usnig FLIP27 and have included that backport in this 
PR Backport: https://github.com/apache/iceberg/pull/10832** 
   
   This PR adds the following support.
       - https://github.com/apache/iceberg/pull/10832 for Iceberg 1.5.x
       - Enhances `IcebergTableSource` to implement `SupportsPartitioning`
       interface which we defined on the [flink
       side](https://github.com/apache/flink/pull/26715) which enables Iceberg
       to report Partitioning metadata to the Flink Query planner. Done via
       `outputPartitioning()` returning `KeyGroupedPartitioning` with table’s
       partition scheme. It can support various transform types including
       bucket, identity, month, day, year.
       - Improvements to `IcebergSource` to support StoragePartitionJoin
       - Enhances `FlinkSplitPlanner` to include a method to group ScanTasks by
       groupingKey (Partition Values) which enables us to ensure that all
       records within the same partition end up being processed by the same
       subtask.
       - PartitionAwareSplitAssignment capabilities including a new
       `PartitionAwareSplitAssignerFactory` and `PartitionAwareSplitAssigner`
       which is responsible for ensuring that records with the same partition
       are assigned to the same subtask via deterministic assignment
       - Includes a new`SpecTransformToFlinkTransform` to map the various
       TransformExpressions used to represent the partitions to the Flink
       System
   
   ## Testing
   * Added Unit tests to `TestPartitionAwareSplitAssigner` to verify that 
splits were deterministically applied to the correct subtasks
   * Added Unit Tests `TestFlinkSplitPlanner` to test improved functionality to 
get batchSplits based on `ScanGroup`
   * Added Unit test `TestStoragePartitionedJoin` to verify that we correctly 
ensure that we get the correct metadata
   * Run the following Queries internally and compare to Spark output
   
   **Correctly use SPJ**
   ```
   select count(*) from iceberg.db.simple_bucketed as table1 join 
iceberg.db.simple_bucketed_2 as table2 on table1.user_id = table2.user_id2
   ```
   
   ```
   select count(*) from iceberg.db.user_id_two_partition_cols t1 join 
iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = 
t2.user_id
   ```
   
   ```
   select count(*) from iceberg.db.user_id_two_partition_cols t1 join 
iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = 
t2.user_id  where t1.dt = '2025-05-01' and t2.dt = '2025-05-01'
   ```
   
   **Correctly cannot use SPJ**
   ```
   select count(*) from iceberg.db.simple_bucketed as table1 join 
iceberg.db.simple_bucketed_32 as table2 on table1.user_id = table2.user_id2
   ```
   ```
   select count(*) from iceberg.db.user_id_two_partition_cols t1 join 
iceberg.db.user_id_two_partition_cols_2 t2 on t1.dt = t2.dt and t1.user_id = 
t2.user_id  where t1.dt = '2025-05-01' and t2.dt = '2025-05-01' and t1.user_id 
=123 and t2.user_id =123
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to