soumilshah1995 opened a new issue, #13916:
URL: https://github.com/apache/iceberg/issues/13916

   ### Query engine
   
   
   Hi there, I’m trying to implement SPJ joins, but they keep defaulting to 
sort-merge joins. Could you help me out?
   
   ```
   
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import spark_partition_id
   import os,sys
   
   # ----------------------------
   # Environment
   # ----------------------------
   os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
   
   ICEBERG_VERSION = "1.5.0"  # SPJ support
   SPARK_VERSION = "3.5.4"
   
   SUBMIT_ARGS = f"--packages 
org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION} 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   
   warehouse_path = "/Users/soumilshah/Desktop/warehouse"
   
   # ----------------------------
   # Spark Session
   # ----------------------------
   spark = SparkSession.builder \
       .appName("IcebergSPJExample") \
       .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
       .config("spark.sql.catalog.dev", 
"org.apache.iceberg.spark.SparkCatalog") \
       .config("spark.sql.catalog.dev.type", "hadoop") \
       .config("spark.sql.catalog.dev.warehouse", warehouse_path) \
       .getOrCreate()
   
   # ----------------------------
   # Create two tables with same bucket partitioning
   # ----------------------------
   df = spark.range(0, 100_000)  # simple numeric dataset
   
   # Add partition column based on Spark partition id
   df_a = df.repartition(10).withColumn("part", spark_partition_id())
   df_b = df.repartition(10).withColumn("part", spark_partition_id())
   
   # Write table A
   
df_a.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice1")
   
   # Write table B
   
df_b.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice2")
   
   spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
   spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
   spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", 
"true")
   spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
   
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled",
 "true")
   spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
   spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
   spark.conf.set("spark.sql.adaptive.enabled", "false")
   
   
   df = spark.sql("""
   SELECT a.part, count(*)
   FROM dev.ice1 a
   JOIN dev.ice2 b
   ON a.part = b.part
   GROUP BY a.part
   """)
   
   df.show()
   
   ```
   
   <img width="1102" height="1240" alt="Image" 
src="https://github.com/user-attachments/assets/e0548c7d-e63d-4577-bdd9-09d280c23a70";
 />
   
   
   Ref 
   
https://medium.com/expedia-group-tech/turbocharge-efficiency-slash-costs-mastering-spark-iceberg-joins-with-storage-partitioned-join-03fdc1ff75c0
   
   https://github.com/apache/iceberg/issues/10250
   
   
https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit?tab=t.0#heading=h.82w8qxfl2uwl
   
   
   ### Question
   
   Spark Iceberg SPJ joins 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to