korbel-jacek opened a new issue, #10891: URL: https://github.com/apache/iceberg/issues/10891
### Apache Iceberg version 1.4.2 ### Query engine Spark ### Please describe the bug 🐞 Hi, I am trying to MERGE a small iceberg table into a large iceberg table, but the performance is bad. I have 2 iceberg tables split into buckets and sorted locally by these columns to use Storage Partition Join and prevent shuffling. Additionally, I use merge-on-read to make merge into operations faster, but it still takes a lot of time to sort these tables when merging. There is a sort step before the sort merge join. Is it possible to prevent this sorting step somehow, as I assume we do not need additional sorting? ``` == Physical Plan == WriteDelta (14) +- * Sort (13) +- Exchange (12) +- MergeRows (11) +- * Project (10) +- * SortMergeJoin RightOuter (9) :- * Sort (5) : +- * Filter (4) : +- * Project (3) : +- * ColumnarToRow (2) : +- BatchScan spark_catalog.default.customer3 (1) +- * Sort (8) +- * ColumnarToRow (7) +- BatchScan spark_catalog.default.customer4 (6) (8) Sort [codegen id : 2] Input [5]: [customer_id#127, name#128, country#129, order_id#130, amount#131] Arguments: [customer_id#127 ASC NULLS FIRST, order_id#130 ASC NULLS FIRST], false, 0 ```  Example code to reproduce: `spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hadoop --conf spark.sql.sources.v2.bucketing.enabled=true --conf spark.sql.sources.v2.bucketing.push.part.values.enabled=true --conf spark.sql.requireAllClusterKeysForCoPartition=false --conf spark.sql.iceberg.planning.preserve-data-grouping=true --conf spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true --conf spark.sql.sources.v2.bucketing.pushPartValues.enabled=true --conf spark.dynamicAllocation.enabled=false --conf spark.shuffle.useOldFetchProtocol=true --conf spark.sql.shuffle.partitions=10 --conf spark.sql.adaptive.enabled=false --conf spark.sql.join.preferSortMergeJoin=false --conf spark.sql.bucketing.coalesceBucketsInJoin.enabled=true --conf spar k.sql.catalog.spark_catalog.warehouse=spark-warehouse/iceberg ` ``` import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SparkSession, DataFrame} // Create sample data for customers val customerData = Seq( (1, "John Doe", "USA",1,1), (2, "Jane Smith", "Canada",1,1), (3, "Alice Johnson", "UK",1,1), (4, "Bob Brown", "USA",1,1), (5, "Charlie Davis", "Canada",1,1) ) // Create DataFrame for customers val customerDF = spark.createDataFrame(customerData).toDF("customer_id", "name", "country", "order_id","amount") // Partition both DataFrames by customer_id val partitionedCustomerDF = customerDF.repartition(col("customer_id")) partitionedCustomerDF.writeTo("default.customer3").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace() partitionedCustomerDF.writeTo("default.customer4").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace() spark.sql("TRUNCATE table default.customer4") spark.sql("TRUNCATE table default.customer3") spark.sql("ALTER TABLE default.customer4 WRITE LOCALLY ORDERED BY customer_id, order_id").show spark.sql("ALTER TABLE default.customer3 WRITE LOCALLY ORDERED BY customer_id, order_id").show spark.sql("ALTER TABLE default.customer3 SET TBLPROPERTIES ('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')"); spark.sql("ALTER TABLE default.customer4 SET TBLPROPERTIES ('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')"); partitionedCustomerDF.writeTo("default.customer4").append() partitionedCustomerDF.writeTo("default.customer3").append() spark.sql(""" MERGE INTO default.customer3 AS target USING default.customer4 AS source ON target.customer_id = source.customer_id AND target.order_id = source.order_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """).show() ``` ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org