tmnd1991 commented on PR #9233:
URL: https://github.com/apache/iceberg/pull/9233#issuecomment-1847179494

   Sure, let me add a bit of context:
   I have two table with the exact same schema/layout, partitioned on 3 columns:
   - identity(MEAS_YM)
   - identity(MEAS_DD)
   - bucket(POD, 4)
   The source table (small one) has strictly a subset of partitions w/r/t the 
target table (big one).
   In this example I will talk about a local reproducer but keep in mind we are 
talking about a 65TB table with 400k partitions, so every 1% improvement 
actually means a lot.
   
   I started running a merge statement as following, taking advantage of SPJ:
   ```
   MERGE INTO target USING (SELECT * FROM source)
   ON target.MEAS_YM = source.MEAS_YM AND target. MEAS_DD = source. MEAS_DD AND 
target.POD = source.POD
   WHEN MATCHED THEN UPDATE SET ...
   ```
   
   This results in the following physical plan:
   ```
   == Physical Plan ==
   ReplaceData (13)
   +- * Sort (12)
      +- * Project (11)
         +- MergeRows (10)
            +- SortMergeJoin FullOuter (9)
               :- * Sort (4)
               :  +- * Project (3)
               :     +- * ColumnarToRow (2)
               :        +- BatchScan target (1)
               +- * Sort (8)
                  +- * Project (7)
                     +- * ColumnarToRow (6)
                        +- BatchScan source (5)
   ===== Subqueries =====
   
   Subquery:1 Hosting operator id = 1 Hosting Expression = _file#2274 IN 
subquery#2672
   * HashAggregate (26)
   +- Exchange (25)
      +- * HashAggregate (24)
         +- * Project (23)
            +- * SortMergeJoin LeftSemi (22)
               :- * Sort (17)
               :  +- * Filter (16)
               :     +- * ColumnarToRow (15)
               :        +- BatchScan target (14)
               +- * Sort (21)
                  +- * Filter (20)
                     +- * ColumnarToRow (19)
                        +- BatchScan source (18)
   ```
   
   with
   
   ```
   (1) BatchScan target
   Output [60]: [..., _file#2274]
   target (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
   
   (5) BatchScan source
   Output [60]: [...]
   source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
   
   (14) BatchScan target
   Output [8]: [..., _file#2590]
   target (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD 
IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
   
   (18) BatchScan source
   Output [7]: [...]
   source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD 
IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
   ```
   
   This was creating 33 (+10 to exchange the file names) tasks for the subquery 
and 33 tasks for the second join.
   Practically I know for sure that I hit only 25 partitions, not 33 (i.e. some 
files were still read even if we know upfront that they are not needed, also 
the `_file IN (subquery)` can't prune any file because it's dynamic. On top of 
that, I observed that even if files should've been excluded by Spark in 
post-scan filter, still the execution of the task was not as fast as I expected 
(i.e. close to 0ms)).
   
   Therefore, knowing exactly the partitions that I hit beforehand, I tried to 
help iceberg/spark a little enumerating the partitions values that are actually 
hit:
   
   ```
   MERGE INTO target USING (SELECT * FROM source)
   ON target.`POD` = source.`POD` AND target.`MEAS_YM` = source.`MEAS_YM` AND 
target.`MEAS_DD` = source.`MEAS_DD` AND (
     (target.`meas_ym` = '202306' AND target.`meas_dd` = '02' AND 
system.bucket(4, target.`pod`) IN (0,2,3)) OR
     (target.`meas_ym` = '202306' AND target.`meas_dd` = '01') OR 
     (target.`meas_ym` = '202307' AND target.`meas_dd` = '02' AND 
system.bucket(4, target.`pod`) IN (1,3)) OR 
     (target.`meas_ym` = '202306' AND target.`meas_dd` = '03') OR 
     (target.`meas_ym` = '202308' AND target.`meas_dd` = '01' AND 
system.bucket(4, target.`pod`) IN (0,1,2)) OR 
     (target.`meas_ym` = '202307' AND target.`meas_dd` = '03' AND 
system.bucket(4, target.`pod`) IN (0,1,2)) OR 
     (target.`meas_ym` = '202308' AND target.`meas_dd` = '03' AND 
system.bucket(4, target.`pod`) IN (0,3)) OR 
     (target.`meas_ym` = '202307' AND target.`meas_dd` = '01' AND 
system.bucket(4, target.`pod`) IN (0,1,2)) OR 
     (target.`meas_ym` = '202308' AND target.`meas_dd` = '02' AND 
system.bucket(4, target.`pod`) IN (3)))
   WHEN MATCHED THEN UPDATE SET ...
   ```
   
   To my surprise the plan was exactly the same...
   
   Then I fixed this issue and also #9191 locally (adding an optimiser to my 
spark session) and the scans actually changed:
   
   ```
   (1) BatchScan target
   Output [60]: [..., _file#2279]
   target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02' 
AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307' 
AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND 
MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD) 
IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD) 
IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR 
(((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR 
((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), 
groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
   
   (5) BatchScan source
   Output [60]: [...]
   source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
   
   (14) BatchScan target
   Output [8]: [..., _file#2590]
   target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02' 
AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307' 
AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND 
MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD) 
IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD) 
IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR 
(((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR 
((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), POD IS NOT 
NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, MAGNITUDE IS NOT NULL, 
METER_KEY IS NOT NULL, REC_ID IS NOT NULL, COLLECT_ID IS NOT NULL, 
groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
   
   (18) BatchScan source
   Output [7]: [...]
   source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD 
IS NOT NULL, MAGNITUDE IS NOT NULL, METER_KEY IS NOT NULL, REC_ID IS NOT NULL, 
COLLECT_ID IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
   ```
   
   With this plan I obtain 25 (+10 of shuffle) + 25 tasks, hitting actually 
only the minimum number of partitions.
   
   ----
   
   Given the context, I think that I probably highlighted 2 "bugs":
   
   1. the fact that also the full-outer join condition can be used to prune 
partitions (fixed in this PR)
   2. for some reason spark is not able to detect correctly the minimum subset 
of hit partitions (maybe I can work on another PR for this, but I guess it's 
much harder and maybe part of Spark codebase)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to