soumilshah1995 commented on issue #10250:
URL: https://github.com/apache/iceberg/issues/10250#issuecomment-3218114973
Hi there I tried above example its still doing sort merge join
```
from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id
import os,sys
# ----------------------------
# Environment
# ----------------------------
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
ICEBERG_VERSION = "1.5.0" # SPJ support
SPARK_VERSION = "3.4"
SUBMIT_ARGS = f"--packages
org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:{ICEBERG_VERSION}
pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
warehouse_path = "/Users/soumilshah/Desktop/warehouse"
# ----------------------------
# Spark Session
# ----------------------------
spark = SparkSession.builder \
.appName("IcebergSPJExample") \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.dev",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.dev.type", "hadoop") \
.config("spark.sql.catalog.dev.warehouse", warehouse_path) \
.config("spark.sql.join.preferSortMergeJoin", "false") \
.config("spark.sql.sources.v2.bucketing.enabled", "true") \
.config("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
\
.config("spark.sql.iceberg.planning.preserve-data-grouping", "true") \
.config("spark.sql.requireAllClusterKeysForCoPartition", "false") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled",
"true") \
.getOrCreate()
# ----------------------------
# Create two tables with same bucket partitioning
# ----------------------------
df = spark.range(0, 100_000) # simple numeric dataset
# Add partition column based on Spark partition id
df_a = df.repartition(10).withColumn("part", spark_partition_id())
df_b = df.repartition(10).withColumn("part", spark_partition_id())
# Write table A
df_a.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice1")
# Write table B
df_b.write.partitionBy("part").format("iceberg").mode("overwrite").saveAsTable("dev.ice2")
spark.sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
spark.sql("SET spark.sql.join.preferSortMergeJoin=false")
spark.sql("SET `spark.sql.sources.v2.bucketing.enabled`=true")
spark.sql("SET `spark.sql.sources.v2.bucketing.pushPartValues.enabled`=true")
spark.sql("SET `spark.sql.iceberg.planning.preserve-data-grouping`=true")
spark.sql("SET `spark.sql.requireAllClusterKeysForCoPartition`=false")
spark.sql("SET
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled`=true")
spark.sql("""
SELECT a.id AS id1, b.id AS id2
FROM dev.ice1 a
JOIN dev.ice2 b
ON a.id = b.id AND a.part = b.part
LIMIT 20
""").show()
```
<img width="551" height="620" alt="Image"
src="https://github.com/user-attachments/assets/d1560a63-0cd6-4a41-9034-f6424f6c96a2"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]