kbjorklu opened a new issue, #10357:
URL: https://github.com/apache/iceberg/issues/10357
### Apache Iceberg version
1.5.2 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
The following code gives `AssertionError: want 1000 rows, got 2000`. Setting
row group size to 10000 or disabling partiallyClusteredDistribution passes the
check.
```python
from tempfile import TemporaryDirectory
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
ROWS = 1000
PARTIALLY_CLUSTERED_DISTRIBUTION = "true" # Set to "false" to pass the test
ROW_GROUP_SIZE_BYTES = 10 # Set to 10000 to pass the test
with TemporaryDirectory() as tmpdir:
spark = (
SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold",
-1)
.config("spark.sql.adaptive.enabled", "false")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.5.2")
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config("spark.sql.catalog.test",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.test.type", "hadoop")
.config("spark.sql.catalog.test.warehouse", tmpdir)
.config("spark.sql.defaultCatalog", "test")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.sources.v2.bucketing.enabled", "true")
.config("spark.sql.sources.v2.bucketing.pushPartValues.enabled",
"true")
.config(
"spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled",
PARTIALLY_CLUSTERED_DISTRIBUTION,
)
.config("spark.sql.iceberg.planning.preserve-data-grouping", "true")
.config("spark.sql.requireAllClusterKeysForCoPartition", "false")
.getOrCreate()
)
spark.sql(
f"""
CREATE TABLE test.db.a(id long)
USING iceberg
PARTITIONED BY (bucket(1, id))
TBLPROPERTIES('write.parquet.row-group-size-bytes' =
'{ROW_GROUP_SIZE_BYTES}')
"""
)
df = spark.range(ROWS)
df = df.repartition(F.expr("bucket(2, id)"))
df.writeTo("test.db.a").append()
df.writeTo("test.db.a").append()
assert spark.table("test.db.a").count() == 2 * ROWS
spark.sql(
f"""
CREATE TABLE test.db.b(id long)
USING iceberg
PARTITIONED BY (bucket(1, id))
TBLPROPERTIES('write.parquet.row-group-size-bytes' =
'{ROW_GROUP_SIZE_BYTES}')
"""
)
df.writeTo("test.db.b").append()
assert spark.table("test.db.b").count() == ROWS
spark.sql(
f"""
CREATE TABLE test.db.c(id long)
USING iceberg
PARTITIONED BY (bucket(1, id))
TBLPROPERTIES('write.parquet.row-group-size-bytes' =
'{ROW_GROUP_SIZE_BYTES}')
"""
)
spark.sql(
f"""
INSERT INTO test.db.c
SELECT id
FROM test.db.a
SEMI JOIN test.db.b
USING (id)
GROUP BY id
"""
).explain()
got_rows = spark.table("test.db.c").count()
assert got_rows == ROWS, f"want {ROWS} rows, got {got_rows}"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]