tmnd1991 commented on PR #9233: URL: https://github.com/apache/iceberg/pull/9233#issuecomment-1847179494
Sure, let me add a bit of context: I have two table with the exact same schema/layout, partitioned on 3 columns: - identity(MEAS_YM) - identity(MEAS_DD) - bucket(POD, 4) The source table (small one) has strictly a subset of partitions w/r/t the target table (big one). In this example I will talk about a local reproducer but keep in mind we are talking about a 65TB table with 400k partitions, so every 1% improvement actually means a lot. I started running a merge statement as following, taking advantage of SPJ: ``` MERGE INTO target USING (SELECT * FROM source) ON target.MEAS_YM = source.MEAS_YM AND target. MEAS_DD = source. MEAS_DD AND target.POD = source.POD WHEN MATCHED THEN UPDATE SET ... ``` This results in the following physical plan: ``` == Physical Plan == ReplaceData (13) +- * Sort (12) +- * Project (11) +- MergeRows (10) +- SortMergeJoin FullOuter (9) :- * Sort (4) : +- * Project (3) : +- * ColumnarToRow (2) : +- BatchScan target (1) +- * Sort (8) +- * Project (7) +- * ColumnarToRow (6) +- BatchScan source (5) ===== Subqueries ===== Subquery:1 Hosting operator id = 1 Hosting Expression = _file#2274 IN subquery#2672 * HashAggregate (26) +- Exchange (25) +- * HashAggregate (24) +- * Project (23) +- * SortMergeJoin LeftSemi (22) :- * Sort (17) : +- * Filter (16) : +- * ColumnarToRow (15) : +- BatchScan target (14) +- * Sort (21) +- * Filter (20) +- * ColumnarToRow (19) +- BatchScan source (18) ``` with ``` (1) BatchScan target Output [60]: [..., _file#2274] target (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket] (5) BatchScan source Output [60]: [...] source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket] (14) BatchScan target Output [8]: [..., _file#2590] target (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket] (18) BatchScan source Output [7]: [...] source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket] ``` This was creating 33 (+10 to exchange the file names) tasks for the subquery and 33 tasks for the second join. Practically I know for sure that I hit only 25 partitions, not 33 (i.e. some files were still read even if we know upfront that they are not needed, also the `_file IN (subquery)` can't prune any file because it's dynamic. On top of that, I observed that even if files should've been excluded by Spark in post-scan filter, still the execution of the task was not as fast as I expected (i.e. close to 0ms)). Therefore, knowing exactly the partitions that I hit beforehand, I tried to help iceberg/spark a little enumerating the partitions values that are actually hit: ``` MERGE INTO target USING (SELECT * FROM source) ON target.`POD` = source.`POD` AND target.`MEAS_YM` = source.`MEAS_YM` AND target.`MEAS_DD` = source.`MEAS_DD` AND ( (target.`meas_ym` = '202306' AND target.`meas_dd` = '02' AND system.bucket(4, target.`pod`) IN (0,2,3)) OR (target.`meas_ym` = '202306' AND target.`meas_dd` = '01') OR (target.`meas_ym` = '202307' AND target.`meas_dd` = '02' AND system.bucket(4, target.`pod`) IN (1,3)) OR (target.`meas_ym` = '202306' AND target.`meas_dd` = '03') OR (target.`meas_ym` = '202308' AND target.`meas_dd` = '01' AND system.bucket(4, target.`pod`) IN (0,1,2)) OR (target.`meas_ym` = '202307' AND target.`meas_dd` = '03' AND system.bucket(4, target.`pod`) IN (0,1,2)) OR (target.`meas_ym` = '202308' AND target.`meas_dd` = '03' AND system.bucket(4, target.`pod`) IN (0,3)) OR (target.`meas_ym` = '202307' AND target.`meas_dd` = '01' AND system.bucket(4, target.`pod`) IN (0,1,2)) OR (target.`meas_ym` = '202308' AND target.`meas_dd` = '02' AND system.bucket(4, target.`pod`) IN (3))) WHEN MATCHED THEN UPDATE SET ... ``` To my surprise the plan was exactly the same... Then I fixed this issue and also #9191 locally (adding an optimiser to my spark session) and the scans actually changed: ``` (1) BatchScan target Output [60]: [..., _file#2279] target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02' AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307' AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD) IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR (((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR ((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), groupedBy=MEAS_YM, MEAS_DD, POD_bucket] (5) BatchScan source Output [60]: [...] source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket] (14) BatchScan target Output [8]: [..., _file#2590] target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02' AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307' AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD) IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR (((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR ((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, MAGNITUDE IS NOT NULL, METER_KEY IS NOT NULL, REC_ID IS NOT NULL, COLLECT_ID IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket] (18) BatchScan source Output [7]: [...] source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, MAGNITUDE IS NOT NULL, METER_KEY IS NOT NULL, REC_ID IS NOT NULL, COLLECT_ID IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket] ``` With this plan I obtain 25 (+10 of shuffle) + 25 tasks, hitting actually only the minimum number of partitions. ---- Given the context, I think that I probably highlighted 2 "bugs": 1. the fact that also the full-outer join condition can be used to prune partitions (fixed in this PR) 2. for some reason spark is not able to detect correctly the minimum subset of hit partitions (maybe I can work on another PR for this, but I guess it's much harder and maybe part of Spark codebase) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org