kbjorklu opened a new issue, #10357: URL: https://github.com/apache/iceberg/issues/10357
### Apache Iceberg version 1.5.2 (latest release) ### Query engine Spark ### Please describe the bug 🐞 The following code gives `AssertionError: want 1000 rows, got 2000`. Setting row group size to 10000 or disabling partiallyClusteredDistribution passes the check. ```python from tempfile import TemporaryDirectory import pyspark.sql.functions as F from pyspark.sql import SparkSession ROWS = 1000 PARTIALLY_CLUSTERED_DISTRIBUTION = "true" # Set to "false" to pass the test ROW_GROUP_SIZE_BYTES = 10 # Set to 10000 to pass the test with TemporaryDirectory() as tmpdir: spark = ( SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold", -1) .config("spark.sql.adaptive.enabled", "false") .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.5.2") .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") .config("spark.sql.catalog.spark_catalog.type", "hive") .config("spark.sql.catalog.test", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.test.type", "hadoop") .config("spark.sql.catalog.test.warehouse", tmpdir) .config("spark.sql.defaultCatalog", "test") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.sources.v2.bucketing.enabled", "true") .config("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true") .config( "spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", PARTIALLY_CLUSTERED_DISTRIBUTION, ) .config("spark.sql.iceberg.planning.preserve-data-grouping", "true") .config("spark.sql.requireAllClusterKeysForCoPartition", "false") .getOrCreate() ) spark.sql( f""" CREATE TABLE test.db.a(id long) USING iceberg PARTITIONED BY (bucket(1, id)) TBLPROPERTIES('write.parquet.row-group-size-bytes' = '{ROW_GROUP_SIZE_BYTES}') """ ) df = spark.range(ROWS) df = df.repartition(F.expr("bucket(2, id)")) df.writeTo("test.db.a").append() df.writeTo("test.db.a").append() assert spark.table("test.db.a").count() == 2 * ROWS spark.sql( f""" CREATE TABLE test.db.b(id long) USING iceberg PARTITIONED BY (bucket(1, id)) TBLPROPERTIES('write.parquet.row-group-size-bytes' = '{ROW_GROUP_SIZE_BYTES}') """ ) df.writeTo("test.db.b").append() assert spark.table("test.db.b").count() == ROWS spark.sql( f""" CREATE TABLE test.db.c(id long) USING iceberg PARTITIONED BY (bucket(1, id)) TBLPROPERTIES('write.parquet.row-group-size-bytes' = '{ROW_GROUP_SIZE_BYTES}') """ ) spark.sql( f""" INSERT INTO test.db.c SELECT id FROM test.db.a SEMI JOIN test.db.b USING (id) GROUP BY id """ ).explain() got_rows = spark.table("test.db.c").count() assert got_rows == ROWS, f"want {ROWS} rows, got {got_rows}" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org