korbel-jacek opened a new issue, #10891:
URL: https://github.com/apache/iceberg/issues/10891

   ### Apache Iceberg version
   
   1.4.2
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   Hi, I am trying to MERGE a small iceberg table into a large iceberg table, 
but the performance is bad.
   I have 2 iceberg tables split into buckets and sorted locally by these 
columns to use Storage Partition Join and prevent shuffling. Additionally, I 
use merge-on-read to make merge into operations faster, but it still takes a 
lot of time to sort these tables when merging. There is a sort step before the 
sort merge join. Is it possible to prevent this sorting step somehow, as I 
assume we do not need additional sorting?
   ```
   == Physical Plan ==
   WriteDelta (14)
   +- * Sort (13)
      +- Exchange (12)
         +- MergeRows (11)
            +- * Project (10)
               +- * SortMergeJoin RightOuter (9)
                  :- * Sort (5)
                  :  +- * Filter (4)
                  :     +- * Project (3)
                  :        +- * ColumnarToRow (2)
                  :           +- BatchScan spark_catalog.default.customer3 (1)
                  +- * Sort (8)
                     +- * ColumnarToRow (7)
                        +- BatchScan spark_catalog.default.customer4 (6)
   
   (8) Sort [codegen id : 2]
   Input [5]: [customer_id#127, name#128, country#129, order_id#130, amount#131]
   Arguments: [customer_id#127 ASC NULLS FIRST, order_id#130 ASC NULLS FIRST], 
false, 0
   ```
   
   
![image](https://github.com/user-attachments/assets/8040be81-2fc9-4116-a52f-7b879731f6d3)
   
   Example code to reproduce:
   `spark-shell --packages 
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2  --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   --conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog   
--conf spark.sql.catalog.spark_catalog.type=hadoop --conf 
spark.sql.sources.v2.bucketing.enabled=true --conf 
spark.sql.sources.v2.bucketing.push.part.values.enabled=true --conf 
spark.sql.requireAllClusterKeysForCoPartition=false --conf 
spark.sql.iceberg.planning.preserve-data-grouping=true --conf 
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true 
--conf spark.sql.sources.v2.bucketing.pushPartValues.enabled=true --conf 
spark.dynamicAllocation.enabled=false --conf 
spark.shuffle.useOldFetchProtocol=true  --conf spark.sql.shuffle.partitions=10 
--conf spark.sql.adaptive.enabled=false --conf 
spark.sql.join.preferSortMergeJoin=false  --conf 
spark.sql.bucketing.coalesceBucketsInJoin.enabled=true --conf spar
 k.sql.catalog.spark_catalog.warehouse=spark-warehouse/iceberg
   `
   ```
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.{SparkSession, DataFrame}
   // Create sample data for customers
   val customerData = Seq(
     (1, "John Doe", "USA",1,1),
     (2, "Jane Smith", "Canada",1,1),
     (3, "Alice Johnson", "UK",1,1),
     (4, "Bob Brown", "USA",1,1),
     (5, "Charlie Davis", "Canada",1,1)
   )
   
   // Create DataFrame for customers
   val customerDF = spark.createDataFrame(customerData).toDF("customer_id", 
"name", "country", "order_id","amount")
   
   // Partition both DataFrames by customer_id
   val partitionedCustomerDF = customerDF.repartition(col("customer_id"))
   
   
   
partitionedCustomerDF.writeTo("default.customer3").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace()
   
partitionedCustomerDF.writeTo("default.customer4").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace()
   spark.sql("TRUNCATE table default.customer4")
   spark.sql("TRUNCATE table default.customer3")
   
   spark.sql("ALTER TABLE default.customer4 WRITE LOCALLY ORDERED BY 
customer_id, order_id").show
   spark.sql("ALTER TABLE default.customer3 WRITE LOCALLY ORDERED BY 
customer_id, order_id").show
   
   spark.sql("ALTER TABLE default.customer3 SET TBLPROPERTIES 
('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')");
   spark.sql("ALTER TABLE default.customer4 SET TBLPROPERTIES 
('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')");
   
   partitionedCustomerDF.writeTo("default.customer4").append()
   partitionedCustomerDF.writeTo("default.customer3").append()
   
   spark.sql("""
   MERGE INTO default.customer3 AS target
   USING default.customer4 AS source
   ON target.customer_id = source.customer_id AND target.order_id = 
source.order_id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
   """).show()
   ```
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to