ssandona opened a new issue, #13374: URL: https://github.com/apache/iceberg/issues/13374
### Apache Iceberg version 1.9.0 ### Query engine Spark ### Please describe the bug ๐ ## Issue description CBO seems not to be using properly NDVs on Iceberg tables. In specific Joins reorder is not optimized based on NDVs. Running exactly the same joins on parquet tables (no iceberg) populated in the same way we see CBO is properly reordering. So CBO joins reordering works properly on parquet tables but not on iceberg tables. Below reproduction steps: ## Reproduction steps I populate my Iceberg tables in order to have this setup: - online_orders_ndv: - 100,000,000 rows - order_id column: 100,000,000 NDVs - total_returns_ndv: - 2,000,000 rows - customer column: ~20,000 NDVs - order_id column: ~1,000,000 NDVs - locations_ndv: - 1,000,000 rows - customer column: ~20,000 NDVs - store_addr column: 1,000,000 NDVs ``` # Define table names and locations S3_BASE = "s3://mybucket/datasets/temp" TABLE_NAMES = { 'online_orders': 'default.online_orders_ndv', 'total_returns': 'default.total_returns_ndv', 'locations': 'default.locations_ndv' } TABLE_LOCATIONS = { 'online_orders': f"{S3_BASE}/online_orders_ndv", 'total_returns': f"{S3_BASE}/total_returns_ndv", 'locations': f"{S3_BASE}/locations_ndv" } # Create tables and insert data using Spark SQL spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAMES['online_orders']} ( order_id BIGINT ) USING iceberg LOCATION '{TABLE_LOCATIONS['online_orders']}' """) spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAMES['total_returns']} ( order_id BIGINT, customer STRING ) USING iceberg LOCATION '{TABLE_LOCATIONS['total_returns']}' """) spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAMES['locations']} ( customer STRING, store_addr STRING ) USING iceberg LOCATION '{TABLE_LOCATIONS['locations']}' """) # Insert data into online_orders (100M rows) spark.sql(f""" INSERT INTO {TABLE_NAMES['online_orders']} SELECT id as order_id FROM ( SELECT explode(sequence(1, 100000000)) as id ) """) # Insert data into total_returns (2M rows) # 20K customers with 100 returns each spark.sql(f""" WITH customer_base AS ( SELECT explode(sequence(1, 20000)) as customer_id ), returns_base AS ( SELECT customer_id, explode(sequence(1, 100)) as return_seq FROM customer_base ) INSERT INTO {TABLE_NAMES['total_returns']} SELECT CASE WHEN rand() < 0.5 THEN CAST(rand() * 100000000 as BIGINT) ELSE -1 END as order_id, concat('CUST_', customer_id) as customer FROM returns_base """) # Insert data into locations (1M rows) # 20K customers with 50 locations each spark.sql(f""" WITH customer_base AS ( SELECT explode(sequence(1, 20000)) as customer_id ), location_base AS ( SELECT customer_id, explode(sequence(1, 50)) as store_seq FROM customer_base ) INSERT INTO {TABLE_NAMES['locations']} SELECT concat('CUST_', customer_id) as customer, concat('STORE_', customer_id, '_', store_seq) as store_addr FROM location_base """) ``` I then want to test this join: ``` df=spark.sql(f""" SELECT o.order_id, r.customer, l.store_addr FROM {TABLE_NAMES['online_orders']} o JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer """) df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/") ``` ## Test 1 Without CBO, with default values: ``` spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df=spark.sql(f""" SELECT o.order_id, r.customer, l.store_addr FROM {TABLE_NAMES['online_orders']} o JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer """) df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/") ``` I see as a result Joins are executed in the order of the query (which is actually the best) How processed: * RES1: * online_orders (100 million) JOIN total_returns (2 million) * number of output rows: 998,891 (~1 million) * Final res: * RES1 (998,891) JOIN locations_ndv (1 million) * number of output rows: 49,944,550 Here the plan: ``` +- == Initial Plan == Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test, false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id, customer, store_addr] +- WriteFiles +- Project [order_id#808L, customer#810, store_addr#812] +- SortMergeJoin [customer#810], [customer#811], Inner :- Sort [customer#810 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(customer#810, 1000), ENSURE_REQUIREMENTS, [plan_id=1509] : +- Project [order_id#808L, customer#810] : +- SortMergeJoin [order_id#808L], [order_id#809L], Inner : :- Sort [order_id#808L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(order_id#808L, 1000), ENSURE_REQUIREMENTS, [plan_id=1501] : : +- Filter isnotnull(order_id#808L) : : +- BatchScan spark_catalog.default.online_orders_ndv[order_id#808L] spark_catalog.default.online_orders_ndv (branch=null) [filters=order_id IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: [] : +- Sort [order_id#809L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(order_id#809L, 1000), ENSURE_REQUIREMENTS, [plan_id=1502] : +- Filter (isnotnull(order_id#809L) AND isnotnull(customer#810)) : +- BatchScan spark_catalog.default.total_returns_ndv[order_id#809L, customer#810] spark_catalog.default.total_returns_ndv (branch=null) [filters=order_id IS NOT NULL, customer IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: [] +- Sort [customer#811 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(customer#811, 1000), ENSURE_REQUIREMENTS, [plan_id=1510] +- Filter isnotnull(customer#811) +- BatchScan spark_catalog.default.locations_ndv[customer#811, store_addr#812] spark_catalog.default.locations_ndv (branch=null) [filters=customer IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: [] ``` ## Test 2 - with CBO - No NDV stats collected ``` spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.iceberg.report-column-stats", "true") spark.conf.set("spark.sql.cbo.enabled", "true") spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true") df=spark.sql(f""" SELECT o.order_id, r.customer, l.store_addr FROM {TABLE_NAMES['online_orders']} o JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer """) df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/") ``` How processed: * RES1: * total_returns_ndv (2 million) JOIN locations_ndv (1 million) * number of output rows: 100,000,000 * Final res: * RES1 (100 million) JOIN online_orders (100 million) * number of output rows: 49,944,550 CBO only checked the amount of records of the tables and it assumed (wrongly) that joining the small tables first will give less records in output. Here the plan: ``` +- == Initial Plan == Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test, false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id, customer, store_addr] +- WriteFiles +- Project [order_id#839L, customer#841, store_addr#843] +- SortMergeJoin [order_id#840L], [order_id#839L], Inner :- Sort [order_id#840L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(order_id#840L, 1000), ENSURE_REQUIREMENTS, [plan_id=1923] : +- Project [order_id#840L, customer#841, store_addr#843] : +- SortMergeJoin [customer#841], [customer#842], Inner : :- Sort [customer#841 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(customer#841, 1000), ENSURE_REQUIREMENTS, [plan_id=1915] : : +- Filter (isnotnull(order_id#840L) AND isnotnull(customer#841)) : : +- BatchScan spark_catalog.default.total_returns_ndv[order_id#840L, customer#841] spark_catalog.default.total_returns_ndv (branch=null) [filters=order_id IS NOT NULL, customer IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: [] : +- Sort [customer#842 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(customer#842, 1000), ENSURE_REQUIREMENTS, [plan_id=1916] : +- Filter isnotnull(customer#842) : +- BatchScan spark_catalog.default.locations_ndv[customer#842, store_addr#843] spark_catalog.default.locations_ndv (branch=null) [filters=customer IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: [] +- Sort [order_id#839L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(order_id#839L, 1000), ENSURE_REQUIREMENTS, [plan_id=1924] +- Filter isnotnull(order_id#839L) +- BatchScan spark_catalog.default.online_orders_ndv[order_id#839L] spark_catalog.default.online_orders_ndv (branch=null) [filters=order_id IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: [] ``` ## Test 3 - with CBO and NDV stats collected I collected stats for all the tables as this ``` df=spark.sql(f""" CALL system.compute_table_stats( table => '{TABLE_NAMES['total_returns']}' ) """ ) df=spark.sql(f""" CALL system.compute_table_stats( table => '{TABLE_NAMES['online_orders']}' ) """ ) df=spark.sql(f""" CALL system.compute_table_stats( table => '{TABLE_NAMES['locations']}' ) """ ) ``` Then re-attempted the join: ``` spark.sparkContext.setLogLevel("DEBUG") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.iceberg.report-column-stats", "true") spark.conf.set("spark.sql.cbo.enabled", "true") spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true") df=spark.sql(f""" SELECT o.order_id, r.customer, l.store_addr FROM {TABLE_NAMES['online_orders']} o JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer """) ``` In the logs I see: ``` 25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for order_id#150L 25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for order_id#151L 25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for customer#152 25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for customer#153 ``` and the query was executed in the same way as on Test 2 (with a non optimal join order). Seemed NDVs were either not used or did not help CBO. Note: Inspecting DEBUG log files I do not see any GET call made to the .stat file in S3. ## Test 4 - Parquet tables (no iceberg) + CBO + NDVs To confirm itโs an issue with Iceberg NDVs and not with CBO I recreated the same tables with PARQUET ``` # Define table names and locations S3_BASE = "s3://mybucket/datasets/temp" TABLE_NAMES = { 'online_orders': 'default.online_orders_parquet', 'total_returns': 'default.total_returns_parquet', 'locations': 'default.locations_parquet' } TABLE_LOCATIONS = { 'online_orders': f"{S3_BASE}/online_orders_parquet", 'total_returns': f"{S3_BASE}/total_returns_parquet", 'locations': f"{S3_BASE}/locations_parquet" } # Create tables and insert data using Spark SQL spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAMES['online_orders']} ( order_id BIGINT ) USING parquet LOCATION '{TABLE_LOCATIONS['online_orders']}' """) spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAMES['total_returns']} ( order_id BIGINT, customer STRING ) USING parquet LOCATION '{TABLE_LOCATIONS['total_returns']}' """) spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAMES['locations']} ( customer STRING, store_addr STRING ) USING parquet LOCATION '{TABLE_LOCATIONS['locations']}' """) # Insert data into online_orders (100M rows) spark.sql(f""" INSERT INTO {TABLE_NAMES['online_orders']} SELECT id as order_id FROM ( SELECT explode(sequence(1, 100000000)) as id ) """) # Insert data into total_returns (2M rows) # 20K customers with 100 returns each spark.sql(f""" WITH customer_base AS ( SELECT explode(sequence(1, 20000)) as customer_id ), returns_base AS ( SELECT customer_id, explode(sequence(1, 100)) as return_seq FROM customer_base ) INSERT INTO {TABLE_NAMES['total_returns']} SELECT CASE WHEN rand() < 0.5 THEN CAST(rand() * 100000000 as BIGINT) ELSE -1 END as order_id, concat('CUST_', customer_id) as customer FROM returns_base """) # Insert data into locations (1M rows) # 20K customers with 50 locations each spark.sql(f""" WITH customer_base AS ( SELECT explode(sequence(1, 20000)) as customer_id ), location_base AS ( SELECT customer_id, explode(sequence(1, 50)) as store_seq FROM customer_base ) INSERT INTO {TABLE_NAMES['locations']} SELECT concat('CUST_', customer_id) as customer, concat('STORE_', customer_id, '_', store_seq) as store_addr FROM location_base """) ``` Then I computed column stats at table level (only row numners and table sizes, no NDVs): ``` spark.sql(f""" ANALYZE TABLE {TABLE_NAMES['online_orders']} COMPUTE STATISTICS """ ).show(truncate=False) spark.sql(f""" ANALYZE TABLE {TABLE_NAMES['total_returns']} COMPUTE STATISTICS """ ).show(truncate=False) spark.sql(f""" ANALYZE TABLE {TABLE_NAMES['locations']} COMPUTE STATISTICS """ ).show(truncate=False) ``` We see the collected stats for the 3 tables: ``` spark.sql(f""" DESCRIBE EXTENDED {TABLE_NAMES['online_orders']} """ ).show(truncate=False) +----------------------------+----------------------------------------------------------------+-------+ |col_name |data_type |comment| +----------------------------+----------------------------------------------------------------+-------+ |order_id |bigint |NULL | | | | | |# Detailed Table Information| | | |Catalog |spark_catalog | | |Database |default | | |Table |online_orders_parquet | | |Owner |hadoop | | |Created Time |Tue Jun 24 07:59:58 GMT 2025 | | |Last Access |UNKNOWN | | |Created By |Spark 3.5.4-amzn-0 | | |Type |EXTERNAL | | |Provider |parquet | | |Statistics |400620383 bytes, 100000000 rows | | |Location |s3://mybucket/datasets/temp/online_orders_parquet| | |Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | | |InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | | |OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | | +----------------------------+----------------------------------------------------------------+-------+ spark.sql(f""" DESCRIBE EXTENDED {TABLE_NAMES['total_returns']} """ ).show(truncate=False) +----------------------------+----------------------------------------------------------------+-------+ |col_name |data_type |comment| +----------------------------+----------------------------------------------------------------+-------+ |order_id |bigint |NULL | |customer |string |NULL | | | | | |# Detailed Table Information| | | |Catalog |spark_catalog | | |Database |default | | |Table |total_returns_parquet | | |Owner |hadoop | | |Created Time |Tue Jun 24 07:59:59 GMT 2025 | | |Last Access |UNKNOWN | | |Created By |Spark 3.5.4-amzn-0 | | |Type |EXTERNAL | | |Provider |parquet | | |Statistics |7523202 bytes, 2000000 rows | | |Location |s3://mybucket/datasets/temp/total_returns_parquet| | |Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | | |InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | | |OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | | +----------------------------+----------------------------------------------------------------+-------+ spark.sql(f""" DESCRIBE EXTENDED {TABLE_NAMES['locations']} """ ).show(truncate=False) +----------------------------+--------------------------------------------------------------+-------+ |col_name |data_type |comment| +----------------------------+--------------------------------------------------------------+-------+ |customer |string |NULL | |store_addr |string |NULL | | | | | |# Detailed Table Information| | | |Catalog |spark_catalog | | |Database |default | | |Table |locations_parquet | | |Owner |hadoop | | |Created Time |Tue Jun 24 08:00:01 GMT 2025 | | |Last Access |UNKNOWN | | |Created By |Spark 3.5.4-amzn-0 | | |Type |EXTERNAL | | |Provider |parquet | | |Statistics |4408847 bytes, 1000000 rows | | |Location |s3://mybucket/datasets/temp/locations_parquet | | |Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | | |InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | | |OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat| | +----------------------------+--------------------------------------------------------------+-------+ ``` Re-attempted the query ``` spark.conf.set("spark.sql.cbo.enabled", "true") spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df=spark.sql(f""" SELECT o.order_id, r.customer, l.store_addr FROM {TABLE_NAMES['online_orders']} o JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer """) df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/") ``` As for Test 2, CBO did reorder based on table sizes (wrongly/non optimal): ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test, false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id, customer, store_addr] +- WriteFiles +- *(9) Project [order_id#259L, customer#261, store_addr#263] +- *(9) SortMergeJoin [order_id#260L], [order_id#259L], Inner :- *(7) Sort [order_id#260L ASC NULLS FIRST], false, 0 : +- AQEShuffleRead coalesced : +- ShuffleQueryStage 3 : +- Exchange hashpartitioning(order_id#260L, 1000), ENSURE_REQUIREMENTS, [plan_id=1905] : +- *(6) Project [order_id#260L, customer#261, store_addr#263] : +- *(6) SortMergeJoin [customer#261], [customer#262], Inner : :- *(4) Sort [customer#261 ASC NULLS FIRST], false, 0 : : +- AQEShuffleRead coalesced : : +- ShuffleQueryStage 0 : : +- Exchange hashpartitioning(customer#261, 1000), ENSURE_REQUIREMENTS, [plan_id=1726] : : +- *(1) Filter (isnotnull(order_id#260L) AND isnotnull(customer#261)) : : +- *(1) ColumnarToRow : : +- FileScan parquet spark_catalog.default.total_returns_parquet[order_id#260L,customer#261] Batched: true, DataFilters: [isnotnull(order_id#260L), isnotnull(customer#261)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/total_returns_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(order_id), IsNotNull(customer)], ReadSchema: struct<order_id:bigint,customer:string> : +- *(5) Sort [customer#262 ASC NULLS FIRST], false, 0 : +- AQEShuffleRead coalesced : +- ShuffleQueryStage 1 : +- Exchange hashpartitioning(customer#262, 1000), ENSURE_REQUIREMENTS, [plan_id=1743] : +- *(2) Filter isnotnull(customer#262) : +- *(2) ColumnarToRow : +- FileScan parquet spark_catalog.default.locations_parquet[customer#262,store_addr#263] Batched: true, DataFilters: [isnotnull(customer#262)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/locations_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(customer)], ReadSchema: struct<customer:string,store_addr:string> +- *(8) Sort [order_id#259L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 2 +- Exchange hashpartitioning(order_id#259L, 1000), ENSURE_REQUIREMENTS, [plan_id=1764] +- *(3) Filter isnotnull(order_id#259L) +- *(3) ColumnarToRow +- FileScan parquet spark_catalog.default.online_orders_parquet[order_id#259L] Batched: true, DataFilters: [isnotnull(order_id#259L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/online_orders_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(order_id)], ReadSchema: struct<order_id:bigint> ``` Then I computed column stats (including NDVs): ``` spark.sql(f""" ANALYZE TABLE {TABLE_NAMES['online_orders']} COMPUTE STATISTICS FOR ALL COLUMNS """ ).show(truncate=False) spark.sql(f""" ANALYZE TABLE {TABLE_NAMES['total_returns']} COMPUTE STATISTICS FOR ALL COLUMNS """ ).show(truncate=False) spark.sql(f""" ANALYZE TABLE {TABLE_NAMES['locations']} COMPUTE STATISTICS FOR ALL COLUMNS """ ).show(truncate=False) ``` We can see the columns computed stats: ``` spark.sql(f""" DESCRIBE EXTENDED {TABLE_NAMES['online_orders']} order_id """ ).show(truncate=False) +--------------+----------+ |info_name |info_value| +--------------+----------+ |col_name |order_id | |data_type |bigint | |comment |NULL | |min |1 | |max |100000000 | |num_nulls |0 | |distinct_count|100000000 | |avg_col_len |8 | |max_col_len |8 | |histogram |NULL | +--------------+----------+ spark.sql(f""" DESCRIBE EXTENDED {TABLE_NAMES['total_returns']} order_id """ ).show(truncate=False) +--------------+----------+ |info_name |info_value| +--------------+----------+ |col_name |order_id | |data_type |bigint | |comment |NULL | |min |-1 | |max |99999948 | |num_nulls |0 | |distinct_count|1063157 | |avg_col_len |8 | |max_col_len |8 | |histogram |NULL | +--------------+----------+ spark.sql(f""" DESCRIBE EXTENDED {TABLE_NAMES['total_returns']} customer """ ).show(truncate=False) +--------------+----------+ |info_name |info_value| +--------------+----------+ |col_name |customer | |data_type |string | |comment |NULL | |min |NULL | |max |NULL | |num_nulls |0 | |distinct_count|20274 | |avg_col_len |10 | |max_col_len |10 | |histogram |NULL | +--------------+----------+ spark.sql(f""" DESCRIBE EXTENDED {TABLE_NAMES['locations']} customer """ ).show(truncate=False) +--------------+----------+ |info_name |info_value| +--------------+----------+ |col_name |customer | |data_type |string | |comment |NULL | |min |NULL | |max |NULL | |num_nulls |0 | |distinct_count|20274 | |avg_col_len |10 | |max_col_len |10 | |histogram |NULL | +--------------+----------+ spark.sql(f""" DESCRIBE EXTENDED {TABLE_NAMES['locations']} store_addr """ ).show(truncate=False) +--------------+----------+ |info_name |info_value| +--------------+----------+ |col_name |store_addr| |data_type |string | |comment |NULL | |min |NULL | |max |NULL | |num_nulls |0 | |distinct_count|1000000 | |avg_col_len |14 | |max_col_len |14 | |histogram |NULL | +--------------+----------+ ``` Re-attempted the query: ``` spark.conf.set("spark.sql.cbo.enabled", "true") spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df=spark.sql(f""" SELECT o.order_id, r.customer, l.store_addr FROM {TABLE_NAMES['online_orders']} o JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer """) df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/") ``` We can see that CBO having NDVs properly reordered the queries: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test, false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id, customer, store_addr] +- WriteFiles +- *(5) Project [order_id#1167L, customer#2914, store_addr#4685] +- *(5) ShuffledHashJoin [customer#2914], [customer#4684], Inner, BuildLeft :- AQEShuffleRead coalesced : +- ShuffleQueryStage 3 : +- Exchange hashpartitioning(customer#2914, 1000), ENSURE_REQUIREMENTS, [plan_id=2527] : +- *(4) Project [order_id#1167L, customer#2914] : +- *(4) ShuffledHashJoin [order_id#1167L], [order_id#2913L], Inner, BuildRight : :- AQEShuffleRead coalesced : : +- ShuffleQueryStage 0 : : +- Exchange hashpartitioning(order_id#1167L, 1000), ENSURE_REQUIREMENTS, [plan_id=2306] : : +- *(1) Filter isnotnull(order_id#1167L) : : +- *(1) ColumnarToRow : : +- FileScan parquet spark_catalog.default.online_orders_parquet[order_id#1167L] Batched: true, DataFilters: [isnotnull(order_id#1167L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/online_orders_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(order_id)], ReadSchema: struct<order_id:bigint> : +- AQEShuffleRead coalesced : +- ShuffleQueryStage 1 : +- Exchange hashpartitioning(order_id#2913L, 1000), ENSURE_REQUIREMENTS, [plan_id=2323] : +- *(2) Filter (isnotnull(order_id#2913L) AND isnotnull(customer#2914)) : +- *(2) ColumnarToRow : +- FileScan parquet spark_catalog.default.total_returns_parquet[order_id#2913L,customer#2914] Batched: true, DataFilters: [isnotnull(order_id#2913L), isnotnull(customer#2914)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/total_returns_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(order_id), IsNotNull(customer)], ReadSchema: struct<order_id:bigint,customer:string> +- AQEShuffleRead coalesced +- ShuffleQueryStage 2 +- Exchange hashpartitioning(customer#4684, 1000), ENSURE_REQUIREMENTS, [plan_id=2344] +- *(3) Filter isnotnull(customer#4684) +- *(3) ColumnarToRow +- FileScan parquet spark_catalog.default.locations_parquet[customer#4684,store_addr#4685] Batched: true, DataFilters: [isnotnull(customer#4684)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/locations_parquet], PartitionFilters: [], PushedFilters: [IsNotNull(customer)], ReadSchema: struct<customer:string,store_addr:string> ``` ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [x] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org