ssandona opened a new issue, #13374:
URL: https://github.com/apache/iceberg/issues/13374

   ### Apache Iceberg version
   
   1.9.0
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug ๐Ÿž
   
   ## Issue description
   
   CBO seems not to be using properly NDVs on Iceberg tables.
   In specific Joins reorder is not optimized based on NDVs.
   
   Running exactly the same joins on parquet tables (no iceberg) populated in 
the same way we see CBO is properly reordering.
   So CBO joins reordering works properly on parquet tables but not on iceberg 
tables.
   
   Below reproduction steps:
   
   ## Reproduction steps
   
   I populate my Iceberg tables in order to have this setup:
   - online_orders_ndv:
       -  100,000,000 rows
       - order_id column: 100,000,000 NDVs
   - total_returns_ndv:
       -  2,000,000 rows
       - customer column: ~20,000 NDVs
       - order_id column: ~1,000,000 NDVs
   - locations_ndv:
       - 1,000,000 rows
       - customer column: ~20,000 NDVs
       - store_addr column: 1,000,000 NDVs
   
   ```
   # Define table names and locations
   S3_BASE = "s3://mybucket/datasets/temp"
   TABLE_NAMES = {
       'online_orders': 'default.online_orders_ndv',
       'total_returns': 'default.total_returns_ndv',
       'locations': 'default.locations_ndv'
   }
   
   TABLE_LOCATIONS = {
       'online_orders': f"{S3_BASE}/online_orders_ndv",
       'total_returns': f"{S3_BASE}/total_returns_ndv",
       'locations': f"{S3_BASE}/locations_ndv"
   }
   
   # Create tables and insert data using Spark SQL
   spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {TABLE_NAMES['online_orders']} (
       order_id BIGINT
   ) USING iceberg
   LOCATION '{TABLE_LOCATIONS['online_orders']}'
   """)
   
   spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {TABLE_NAMES['total_returns']} (
       order_id BIGINT,
       customer STRING
   ) USING iceberg
   LOCATION '{TABLE_LOCATIONS['total_returns']}'
   """)
   
   spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {TABLE_NAMES['locations']} (
       customer STRING,
       store_addr STRING
   ) USING iceberg
   LOCATION '{TABLE_LOCATIONS['locations']}'
   """)
   
   # Insert data into online_orders (100M rows)
   spark.sql(f"""
   INSERT INTO {TABLE_NAMES['online_orders']}
   SELECT id as order_id
   FROM (
       SELECT explode(sequence(1, 100000000)) as id
   )
   """)
   
   # Insert data into total_returns (2M rows)
   # 20K customers with 100 returns each
   spark.sql(f"""
   WITH customer_base AS (
       SELECT explode(sequence(1, 20000)) as customer_id
   ),
   returns_base AS (
       SELECT 
           customer_id,
           explode(sequence(1, 100)) as return_seq
       FROM customer_base
   )
   INSERT INTO {TABLE_NAMES['total_returns']}
   SELECT 
       CASE 
           WHEN rand() < 0.5 THEN CAST(rand() * 100000000 as BIGINT)
           ELSE -1 
       END as order_id,
       concat('CUST_', customer_id) as customer
   FROM returns_base
   """)
   
   # Insert data into locations (1M rows)
   # 20K customers with 50 locations each
   spark.sql(f"""
   WITH customer_base AS (
       SELECT explode(sequence(1, 20000)) as customer_id
   ),
   location_base AS (
       SELECT 
           customer_id,
           explode(sequence(1, 50)) as store_seq
       FROM customer_base
   )
   INSERT INTO {TABLE_NAMES['locations']}
   SELECT 
       concat('CUST_', customer_id) as customer,
       concat('STORE_', customer_id, '_', store_seq) as store_addr
   FROM location_base
   """)
   ```
   
   I then want to test this join:
   
   ```
   df=spark.sql(f"""
   SELECT o.order_id, r.customer, l.store_addr
   FROM {TABLE_NAMES['online_orders']} o
   JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
   JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
   """)
   
   df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
   ```
   
   ## Test 1
   
   Without CBO, with default values:
   
   ```
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
   
   df=spark.sql(f"""
   SELECT o.order_id, r.customer, l.store_addr
   FROM {TABLE_NAMES['online_orders']} o
   JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
   JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
   """)
   
   df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
   ```
   
   I see as a result Joins are executed in the order of the query (which is 
actually the best)
   
   How processed:
   
   * RES1:
       * online_orders (100 million) JOIN total_returns (2 million)
       * number of output rows: 998,891 (~1 million)
   * Final res:
       * RES1 (998,891) JOIN locations_ndv (1 million)
       * number of output rows: 49,944,550
   
   Here the plan:
   
   ```
   +- == Initial Plan ==
      Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test, 
false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id, 
customer, store_addr]
      +- WriteFiles
         +- Project [order_id#808L, customer#810, store_addr#812]
            +- SortMergeJoin [customer#810], [customer#811], Inner
               :- Sort [customer#810 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(customer#810, 1000), 
ENSURE_REQUIREMENTS, [plan_id=1509]
               :     +- Project [order_id#808L, customer#810]
               :        +- SortMergeJoin [order_id#808L], [order_id#809L], Inner
               :           :- Sort [order_id#808L ASC NULLS FIRST], false, 0
               :           :  +- Exchange hashpartitioning(order_id#808L, 
1000), ENSURE_REQUIREMENTS, [plan_id=1501]
               :           :     +- Filter isnotnull(order_id#808L)
               :           :        +- BatchScan 
spark_catalog.default.online_orders_ndv[order_id#808L] 
spark_catalog.default.online_orders_ndv (branch=null) [filters=order_id IS NOT 
NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
               :           +- Sort [order_id#809L ASC NULLS FIRST], false, 0
               :              +- Exchange hashpartitioning(order_id#809L, 
1000), ENSURE_REQUIREMENTS, [plan_id=1502]
               :                 +- Filter (isnotnull(order_id#809L) AND 
isnotnull(customer#810))
               :                    +- BatchScan 
spark_catalog.default.total_returns_ndv[order_id#809L, customer#810] 
spark_catalog.default.total_returns_ndv (branch=null) [filters=order_id IS NOT 
NULL, customer IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
               +- Sort [customer#811 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(customer#811, 1000), 
ENSURE_REQUIREMENTS, [plan_id=1510]
                     +- Filter isnotnull(customer#811)
                        +- BatchScan 
spark_catalog.default.locations_ndv[customer#811, store_addr#812] 
spark_catalog.default.locations_ndv (branch=null) [filters=customer IS NOT 
NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
   ```
   
   ## Test 2 - with CBO - No NDV stats collected 
   
   ```
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
   spark.conf.set("spark.sql.iceberg.report-column-stats", "true")
   spark.conf.set("spark.sql.cbo.enabled", "true")
   spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
   
   df=spark.sql(f"""
   SELECT o.order_id, r.customer, l.store_addr
   FROM {TABLE_NAMES['online_orders']} o
   JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
   JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
   """)
   
   df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
   ```
   
   How processed:
   
   * RES1:
       * total_returns_ndv (2 million) JOIN locations_ndv (1 million)
       * number of output rows: 100,000,000
   * Final res:
       * RES1 (100 million) JOIN online_orders (100 million)
       * number of output rows: 49,944,550
   
   CBO only checked the amount of records of the tables and it assumed 
(wrongly) that joining the small tables first will give less records in output.
   
   Here the plan:
   
   ```
   +- == Initial Plan ==
      Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test, 
false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id, 
customer, store_addr]
      +- WriteFiles
         +- Project [order_id#839L, customer#841, store_addr#843]
            +- SortMergeJoin [order_id#840L], [order_id#839L], Inner
               :- Sort [order_id#840L ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(order_id#840L, 1000), 
ENSURE_REQUIREMENTS, [plan_id=1923]
               :     +- Project [order_id#840L, customer#841, store_addr#843]
               :        +- SortMergeJoin [customer#841], [customer#842], Inner
               :           :- Sort [customer#841 ASC NULLS FIRST], false, 0
               :           :  +- Exchange hashpartitioning(customer#841, 1000), 
ENSURE_REQUIREMENTS, [plan_id=1915]
               :           :     +- Filter (isnotnull(order_id#840L) AND 
isnotnull(customer#841))
               :           :        +- BatchScan 
spark_catalog.default.total_returns_ndv[order_id#840L, customer#841] 
spark_catalog.default.total_returns_ndv (branch=null) [filters=order_id IS NOT 
NULL, customer IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
               :           +- Sort [customer#842 ASC NULLS FIRST], false, 0
               :              +- Exchange hashpartitioning(customer#842, 1000), 
ENSURE_REQUIREMENTS, [plan_id=1916]
               :                 +- Filter isnotnull(customer#842)
               :                    +- BatchScan 
spark_catalog.default.locations_ndv[customer#842, store_addr#843] 
spark_catalog.default.locations_ndv (branch=null) [filters=customer IS NOT 
NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
               +- Sort [order_id#839L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(order_id#839L, 1000), 
ENSURE_REQUIREMENTS, [plan_id=1924]
                     +- Filter isnotnull(order_id#839L)
                        +- BatchScan 
spark_catalog.default.online_orders_ndv[order_id#839L] 
spark_catalog.default.online_orders_ndv (branch=null) [filters=order_id IS NOT 
NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
   ```
   
   ## Test 3 - with CBO and NDV stats collected
   
   I collected stats for all the tables as this
   
   ```
   df=spark.sql(f"""
   CALL system.compute_table_stats(
   table => '{TABLE_NAMES['total_returns']}'
   )
   """
   )
   
   df=spark.sql(f"""
   CALL system.compute_table_stats(
   table => '{TABLE_NAMES['online_orders']}'
   )
   """
   )
   
   df=spark.sql(f"""
   CALL system.compute_table_stats(
   table => '{TABLE_NAMES['locations']}'
   )
   """
   )
   ```
   
   Then re-attempted the join:
   
   ```
   spark.sparkContext.setLogLevel("DEBUG")
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
   spark.conf.set("spark.sql.iceberg.report-column-stats", "true")
   spark.conf.set("spark.sql.cbo.enabled", "true")
   spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
   
   df=spark.sql(f"""
   SELECT o.order_id, r.customer, l.store_addr
   FROM {TABLE_NAMES['online_orders']} o
   JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
   JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
   """)
   ```
   
   In the logs I see:
   
   ```
   25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for 
order_id#150L
   25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for 
order_id#151L
   25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for 
customer#152
   25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for 
customer#153
   ```
   
   and the query was executed in the same way as on Test 2 (with a non optimal 
join order). Seemed NDVs were either not used or did not help CBO.
   
   Note: Inspecting DEBUG log files I do not see any GET call made to the .stat 
file in S3. 
   
   ## Test 4 - Parquet tables (no iceberg) + CBO + NDVs
   
   To confirm itโ€™s an issue with Iceberg NDVs and not with CBO I recreated the 
same tables with PARQUET
   
   ```
   # Define table names and locations
   S3_BASE = "s3://mybucket/datasets/temp"
   TABLE_NAMES = {
       'online_orders': 'default.online_orders_parquet',
       'total_returns': 'default.total_returns_parquet',
       'locations': 'default.locations_parquet'
   }
   
   TABLE_LOCATIONS = {
       'online_orders': f"{S3_BASE}/online_orders_parquet",
       'total_returns': f"{S3_BASE}/total_returns_parquet",
       'locations': f"{S3_BASE}/locations_parquet"
   }
   
   # Create tables and insert data using Spark SQL
   spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {TABLE_NAMES['online_orders']} (
       order_id BIGINT
   ) USING parquet
   LOCATION '{TABLE_LOCATIONS['online_orders']}'
   """)
   
   spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {TABLE_NAMES['total_returns']} (
       order_id BIGINT,
       customer STRING
   ) USING parquet
   LOCATION '{TABLE_LOCATIONS['total_returns']}'
   """)
   
   spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {TABLE_NAMES['locations']} (
       customer STRING,
       store_addr STRING
   ) USING parquet
   LOCATION '{TABLE_LOCATIONS['locations']}'
   """)
   
   # Insert data into online_orders (100M rows)
   spark.sql(f"""
   INSERT INTO {TABLE_NAMES['online_orders']}
   SELECT id as order_id
   FROM (
       SELECT explode(sequence(1, 100000000)) as id
   )
   """)
   
   # Insert data into total_returns (2M rows)
   # 20K customers with 100 returns each
   spark.sql(f"""
   WITH customer_base AS (
       SELECT explode(sequence(1, 20000)) as customer_id
   ),
   returns_base AS (
       SELECT 
           customer_id,
           explode(sequence(1, 100)) as return_seq
       FROM customer_base
   )
   INSERT INTO {TABLE_NAMES['total_returns']}
   SELECT 
       CASE 
           WHEN rand() < 0.5 THEN CAST(rand() * 100000000 as BIGINT)
           ELSE -1 
       END as order_id,
       concat('CUST_', customer_id) as customer
   FROM returns_base
   """)
   
   # Insert data into locations (1M rows)
   # 20K customers with 50 locations each
   spark.sql(f"""
   WITH customer_base AS (
       SELECT explode(sequence(1, 20000)) as customer_id
   ),
   location_base AS (
       SELECT 
           customer_id,
           explode(sequence(1, 50)) as store_seq
       FROM customer_base
   )
   INSERT INTO {TABLE_NAMES['locations']}
   SELECT 
       concat('CUST_', customer_id) as customer,
       concat('STORE_', customer_id, '_', store_seq) as store_addr
   FROM location_base
   """)
   ```
   
   Then I computed column stats at table level (only row numners and table 
sizes, no NDVs):
   
   ```
   spark.sql(f"""
   ANALYZE TABLE {TABLE_NAMES['online_orders']} COMPUTE STATISTICS
   """
   ).show(truncate=False)
   
   spark.sql(f"""
   ANALYZE TABLE {TABLE_NAMES['total_returns']} COMPUTE STATISTICS
   """
   ).show(truncate=False)
   
   spark.sql(f"""
   ANALYZE TABLE {TABLE_NAMES['locations']} COMPUTE STATISTICS
   """
   ).show(truncate=False)
   ```
   
   We see the collected stats for the 3 tables:
   
   ```
   spark.sql(f"""
   DESCRIBE EXTENDED {TABLE_NAMES['online_orders']}
   """
   ).show(truncate=False)
   
   
+----------------------------+----------------------------------------------------------------+-------+
   |col_name                    |data_type                                      
                 |comment|
   
+----------------------------+----------------------------------------------------------------+-------+
   |order_id                    |bigint                                         
                 |NULL   |
   |                            |                                               
                 |       |
   |# Detailed Table Information|                                               
                 |       |
   |Catalog                     |spark_catalog                                  
                 |       |
   |Database                    |default                                        
                 |       |
   |Table                       |online_orders_parquet                          
                 |       |
   |Owner                       |hadoop                                         
                 |       |
   |Created Time                |Tue Jun 24 07:59:58 GMT 2025                   
                 |       |
   |Last Access                 |UNKNOWN                                        
                 |       |
   |Created By                  |Spark 3.5.4-amzn-0                             
                 |       |
   |Type                        |EXTERNAL                                       
                 |       |
   |Provider                    |parquet                                        
                 |       |
   |Statistics                  |400620383 bytes, 100000000 rows                
                 |       |
   |Location                    
|s3://mybucket/datasets/temp/online_orders_parquet|       |
   |Serde Library               
|org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe     |       |
   |InputFormat                 
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat   |       |
   |OutputFormat                
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat  |       |
   
+----------------------------+----------------------------------------------------------------+-------+
   
   spark.sql(f"""
   DESCRIBE EXTENDED {TABLE_NAMES['total_returns']}
   """
   ).show(truncate=False)
   
   
+----------------------------+----------------------------------------------------------------+-------+
   |col_name                    |data_type                                      
                 |comment|
   
+----------------------------+----------------------------------------------------------------+-------+
   |order_id                    |bigint                                         
                 |NULL   |
   |customer                    |string                                         
                 |NULL   |
   |                            |                                               
                 |       |
   |# Detailed Table Information|                                               
                 |       |
   |Catalog                     |spark_catalog                                  
                 |       |
   |Database                    |default                                        
                 |       |
   |Table                       |total_returns_parquet                          
                 |       |
   |Owner                       |hadoop                                         
                 |       |
   |Created Time                |Tue Jun 24 07:59:59 GMT 2025                   
                 |       |
   |Last Access                 |UNKNOWN                                        
                 |       |
   |Created By                  |Spark 3.5.4-amzn-0                             
                 |       |
   |Type                        |EXTERNAL                                       
                 |       |
   |Provider                    |parquet                                        
                 |       |
   |Statistics                  |7523202 bytes, 2000000 rows                    
                 |       |
   |Location                    
|s3://mybucket/datasets/temp/total_returns_parquet|       |
   |Serde Library               
|org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe     |       |
   |InputFormat                 
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat   |       |
   |OutputFormat                
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat  |       |
   
+----------------------------+----------------------------------------------------------------+-------+
   
   spark.sql(f"""
   DESCRIBE EXTENDED {TABLE_NAMES['locations']}
   """
   ).show(truncate=False)
   
   
   
+----------------------------+--------------------------------------------------------------+-------+
   |col_name                    |data_type                                      
               |comment|
   
+----------------------------+--------------------------------------------------------------+-------+
   |customer                    |string                                         
               |NULL   |
   |store_addr                  |string                                         
               |NULL   |
   |                            |                                               
               |       |
   |# Detailed Table Information|                                               
               |       |
   |Catalog                     |spark_catalog                                  
               |       |
   |Database                    |default                                        
               |       |
   |Table                       |locations_parquet                              
               |       |
   |Owner                       |hadoop                                         
               |       |
   |Created Time                |Tue Jun 24 08:00:01 GMT 2025                   
               |       |
   |Last Access                 |UNKNOWN                                        
               |       |
   |Created By                  |Spark 3.5.4-amzn-0                             
               |       |
   |Type                        |EXTERNAL                                       
               |       |
   |Provider                    |parquet                                        
               |       |
   |Statistics                  |4408847 bytes, 1000000 rows                    
               |       |
   |Location                    |s3://mybucket/datasets/temp/locations_parquet  
|       |
   |Serde Library               
|org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe   |       |
   |InputFormat                 
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat |       |
   |OutputFormat                
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat|       |
   
+----------------------------+--------------------------------------------------------------+-------+
   ```
   
   Re-attempted the query
   
   ```
   spark.conf.set("spark.sql.cbo.enabled", "true")
   spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
   
   df=spark.sql(f"""
   SELECT o.order_id, r.customer, l.store_addr
   FROM {TABLE_NAMES['online_orders']} o
   JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
   JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
   """)
   
   df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
   ```
   
   As for Test 2, CBO did reorder based on table sizes (wrongly/non optimal):
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test, 
false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id, 
customer, store_addr]
      +- WriteFiles
         +- *(9) Project [order_id#259L, customer#261, store_addr#263]
            +- *(9) SortMergeJoin [order_id#260L], [order_id#259L], Inner
               :- *(7) Sort [order_id#260L ASC NULLS FIRST], false, 0
               :  +- AQEShuffleRead coalesced
               :     +- ShuffleQueryStage 3
               :        +- Exchange hashpartitioning(order_id#260L, 1000), 
ENSURE_REQUIREMENTS, [plan_id=1905]
               :           +- *(6) Project [order_id#260L, customer#261, 
store_addr#263]
               :              +- *(6) SortMergeJoin [customer#261], 
[customer#262], Inner
               :                 :- *(4) Sort [customer#261 ASC NULLS FIRST], 
false, 0
               :                 :  +- AQEShuffleRead coalesced
               :                 :     +- ShuffleQueryStage 0
               :                 :        +- Exchange 
hashpartitioning(customer#261, 1000), ENSURE_REQUIREMENTS, [plan_id=1726]
               :                 :           +- *(1) Filter 
(isnotnull(order_id#260L) AND isnotnull(customer#261))
               :                 :              +- *(1) ColumnarToRow
               :                 :                 +- FileScan parquet 
spark_catalog.default.total_returns_parquet[order_id#260L,customer#261] 
Batched: true, DataFilters: [isnotnull(order_id#260L), 
isnotnull(customer#261)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[s3://mybucket/datasets/temp/total_returns_parquet], PartitionFilters: 
[], PushedFilters: [IsNotNull(order_id), IsNotNull(customer)], ReadSchema: 
struct<order_id:bigint,customer:string>
               :                 +- *(5) Sort [customer#262 ASC NULLS FIRST], 
false, 0
               :                    +- AQEShuffleRead coalesced
               :                       +- ShuffleQueryStage 1
               :                          +- Exchange 
hashpartitioning(customer#262, 1000), ENSURE_REQUIREMENTS, [plan_id=1743]
               :                             +- *(2) Filter 
isnotnull(customer#262)
               :                                +- *(2) ColumnarToRow
               :                                   +- FileScan parquet 
spark_catalog.default.locations_parquet[customer#262,store_addr#263] Batched: 
true, DataFilters: [isnotnull(customer#262)], Format: Parquet, Location: 
InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/locations_parquet], 
PartitionFilters: [], PushedFilters: [IsNotNull(customer)], ReadSchema: 
struct<customer:string,store_addr:string>
               +- *(8) Sort [order_id#259L ASC NULLS FIRST], false, 0
                  +- AQEShuffleRead coalesced
                     +- ShuffleQueryStage 2
                        +- Exchange hashpartitioning(order_id#259L, 1000), 
ENSURE_REQUIREMENTS, [plan_id=1764]
                           +- *(3) Filter isnotnull(order_id#259L)
                              +- *(3) ColumnarToRow
                                 +- FileScan parquet 
spark_catalog.default.online_orders_parquet[order_id#259L] Batched: true, 
DataFilters: [isnotnull(order_id#259L)], Format: Parquet, Location: 
InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/online_orders_parquet], 
PartitionFilters: [], PushedFilters: [IsNotNull(order_id)], ReadSchema: 
struct<order_id:bigint>
   ```
   
   
   Then I computed column stats (including NDVs):
   
   ```
   spark.sql(f"""
   ANALYZE TABLE {TABLE_NAMES['online_orders']} COMPUTE STATISTICS FOR ALL 
COLUMNS
   """
   ).show(truncate=False)
   
   spark.sql(f"""
   ANALYZE TABLE {TABLE_NAMES['total_returns']} COMPUTE STATISTICS FOR ALL 
COLUMNS
   """
   ).show(truncate=False)
   
   spark.sql(f"""
   ANALYZE TABLE {TABLE_NAMES['locations']} COMPUTE STATISTICS FOR ALL COLUMNS
   """
   ).show(truncate=False)
   ```
   
   We can see the columns computed stats:
   
   ```
   spark.sql(f"""
   DESCRIBE EXTENDED {TABLE_NAMES['online_orders']} order_id
   """
   ).show(truncate=False)
   
   +--------------+----------+
   |info_name     |info_value|
   +--------------+----------+
   |col_name      |order_id  |
   |data_type     |bigint    |
   |comment       |NULL      |
   |min           |1         |
   |max           |100000000 |
   |num_nulls     |0         |
   |distinct_count|100000000 |
   |avg_col_len   |8         |
   |max_col_len   |8         |
   |histogram     |NULL      |
   +--------------+----------+
   
   spark.sql(f"""
   DESCRIBE EXTENDED {TABLE_NAMES['total_returns']} order_id
   """
   ).show(truncate=False)
   
   +--------------+----------+
   |info_name     |info_value|
   +--------------+----------+
   |col_name      |order_id  |
   |data_type     |bigint    |
   |comment       |NULL      |
   |min           |-1        |
   |max           |99999948  |
   |num_nulls     |0         |
   |distinct_count|1063157   |
   |avg_col_len   |8         |
   |max_col_len   |8         |
   |histogram     |NULL      |
   +--------------+----------+
   
   spark.sql(f"""
   DESCRIBE EXTENDED {TABLE_NAMES['total_returns']} customer
   """
   ).show(truncate=False)
   
   +--------------+----------+
   |info_name     |info_value|
   +--------------+----------+
   |col_name      |customer  |
   |data_type     |string    |
   |comment       |NULL      |
   |min           |NULL      |
   |max           |NULL      |
   |num_nulls     |0         |
   |distinct_count|20274     |
   |avg_col_len   |10        |
   |max_col_len   |10        |
   |histogram     |NULL      |
   +--------------+----------+
   
   spark.sql(f"""
   DESCRIBE EXTENDED {TABLE_NAMES['locations']} customer
   """
   ).show(truncate=False)
   
   +--------------+----------+
   |info_name     |info_value|
   +--------------+----------+
   |col_name      |customer  |
   |data_type     |string    |
   |comment       |NULL      |
   |min           |NULL      |
   |max           |NULL      |
   |num_nulls     |0         |
   |distinct_count|20274     |
   |avg_col_len   |10        |
   |max_col_len   |10        |
   |histogram     |NULL      |
   +--------------+----------+
   
   spark.sql(f"""
   DESCRIBE EXTENDED {TABLE_NAMES['locations']} store_addr
   """
   ).show(truncate=False)
   
   +--------------+----------+
   |info_name     |info_value|
   +--------------+----------+
   |col_name      |store_addr|
   |data_type     |string    |
   |comment       |NULL      |
   |min           |NULL      |
   |max           |NULL      |
   |num_nulls     |0         |
   |distinct_count|1000000   |
   |avg_col_len   |14        |
   |max_col_len   |14        |
   |histogram     |NULL      |
   +--------------+----------+
   ```
   
   Re-attempted the query:
   
   ```
   spark.conf.set("spark.sql.cbo.enabled", "true")
   spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
   
   df=spark.sql(f"""
   SELECT o.order_id, r.customer, l.store_addr
   FROM {TABLE_NAMES['online_orders']} o
   JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
   JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
   """)
   
   df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
   ```
   
   We can see that CBO having NDVs properly reordered the queries:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test, 
false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id, 
customer, store_addr]
      +- WriteFiles
         +- *(5) Project [order_id#1167L, customer#2914, store_addr#4685]
            +- *(5) ShuffledHashJoin [customer#2914], [customer#4684], Inner, 
BuildLeft
               :- AQEShuffleRead coalesced
               :  +- ShuffleQueryStage 3
               :     +- Exchange hashpartitioning(customer#2914, 1000), 
ENSURE_REQUIREMENTS, [plan_id=2527]
               :        +- *(4) Project [order_id#1167L, customer#2914]
               :           +- *(4) ShuffledHashJoin [order_id#1167L], 
[order_id#2913L], Inner, BuildRight
               :              :- AQEShuffleRead coalesced
               :              :  +- ShuffleQueryStage 0
               :              :     +- Exchange 
hashpartitioning(order_id#1167L, 1000), ENSURE_REQUIREMENTS, [plan_id=2306]
               :              :        +- *(1) Filter isnotnull(order_id#1167L)
               :              :           +- *(1) ColumnarToRow
               :              :              +- FileScan parquet 
spark_catalog.default.online_orders_parquet[order_id#1167L] Batched: true, 
DataFilters: [isnotnull(order_id#1167L)], Format: Parquet, Location: 
InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/online_orders_parquet], 
PartitionFilters: [], PushedFilters: [IsNotNull(order_id)], ReadSchema: 
struct<order_id:bigint>
               :              +- AQEShuffleRead coalesced
               :                 +- ShuffleQueryStage 1
               :                    +- Exchange 
hashpartitioning(order_id#2913L, 1000), ENSURE_REQUIREMENTS, [plan_id=2323]
               :                       +- *(2) Filter 
(isnotnull(order_id#2913L) AND isnotnull(customer#2914))
               :                          +- *(2) ColumnarToRow
               :                             +- FileScan parquet 
spark_catalog.default.total_returns_parquet[order_id#2913L,customer#2914] 
Batched: true, DataFilters: [isnotnull(order_id#2913L), 
isnotnull(customer#2914)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[s3://mybucket/datasets/temp/total_returns_parquet], PartitionFilters: 
[], PushedFilters: [IsNotNull(order_id), IsNotNull(customer)], ReadSchema: 
struct<order_id:bigint,customer:string>
               +- AQEShuffleRead coalesced
                  +- ShuffleQueryStage 2
                     +- Exchange hashpartitioning(customer#4684, 1000), 
ENSURE_REQUIREMENTS, [plan_id=2344]
                        +- *(3) Filter isnotnull(customer#4684)
                           +- *(3) ColumnarToRow
                              +- FileScan parquet 
spark_catalog.default.locations_parquet[customer#4684,store_addr#4685] Batched: 
true, DataFilters: [isnotnull(customer#4684)], Format: Parquet, Location: 
InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/locations_parquet], 
PartitionFilters: [], PushedFilters: [IsNotNull(customer)], ReadSchema: 
struct<customer:string,store_addr:string>
   ```
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [x] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to