soumilshah1995 opened a new issue, #13916:
URL: https://github.com/apache/iceberg/issues/13916
### Query engine
Hi there, I’m trying to implement SPJ joins, but they keep defaulting to
sort-merge joins. Could you help me out?
```
from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id
import os,sys
# ----------------------------
# Environment
# ----------------------------
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
ICEBERG_VERSION = "1.5.0" # SPJ support
SPARK_VERSION = "3.5.4"
SUBMIT_ARGS = f"--packages
org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION}
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
warehouse_path = "/Users/soumilshah/Desktop/warehouse"
# ----------------------------
# Spark Session
# ----------------------------
spark = SparkSession.builder \
.appName("IcebergSPJExample") \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.dev",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.dev.type", "hadoop") \
.config("spark.sql.catalog.dev.warehouse", warehouse_path) \
.getOrCreate()
# ----------------------------
# Create two tables with same bucket partitioning
# ----------------------------
df = spark.range(0, 100_000) # simple numeric dataset
# Add partition column based on Spark partition id
df_a = df.repartition(10).withColumn("part", spark_partition_id())
df_b = df.repartition(10).withColumn("part", spark_partition_id())
# Write table A
df_a.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice1")
# Write table B
df_b.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice2")
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled",
"true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled",
"true")
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", "false")
df = spark.sql("""
SELECT a.part, count(*)
FROM dev.ice1 a
JOIN dev.ice2 b
ON a.part = b.part
GROUP BY a.part
""")
df.show()
```
<img width="1102" height="1240" alt="Image"
src="https://github.com/user-attachments/assets/e0548c7d-e63d-4577-bdd9-09d280c23a70"
/>
Ref
https://medium.com/expedia-group-tech/turbocharge-efficiency-slash-costs-mastering-spark-iceberg-joins-with-storage-partitioned-join-03fdc1ff75c0
https://github.com/apache/iceberg/issues/10250
https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit?tab=t.0#heading=h.82w8qxfl2uwl
### Question
Spark Iceberg SPJ joins
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]