This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7b7cb9a69dc7 [SPARK-53562][PYTHON][TESTS][FOLLOW-UP] Add more tests
for `maxBytesPerBatch`
7b7cb9a69dc7 is described below
commit 7b7cb9a69dc7f732dc6f67d37b410bf1cb65bf29
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Oct 10 12:03:21 2025 +0800
[SPARK-53562][PYTHON][TESTS][FOLLOW-UP] Add more tests for
`maxBytesPerBatch`
### What changes were proposed in this pull request?
Add more tests for `maxBytesPerBatch`
### Why are the changes needed?
to make sure `maxBytesPerBatch` works
### Does this PR introduce _any_ user-facing change?
no, test-only
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #52567 from zhengruifeng/test_bytes.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../sql/tests/arrow/test_arrow_grouped_map.py | 52 ++++++++++++---------
.../sql/tests/pandas/test_pandas_grouped_map.py | 54 +++++++++++++---------
2 files changed, 61 insertions(+), 45 deletions(-)
diff --git a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
index e1cd507737cf..765bc7ba6fe1 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py
@@ -353,33 +353,41 @@ class ApplyInArrowTestsMixin:
self.assertEqual(df2.join(df2).count(), 1)
def test_arrow_batch_slicing(self):
- with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch":
1000}):
- df = self.spark.range(10000000).select(
- (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
+ df = self.spark.range(10000000).select(
+ (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
+ )
+ cols = {f"col_{i}": sf.col("v") + i for i in range(20)}
+ df = df.withColumns(cols)
+
+ def min_max_v(table):
+ return pa.Table.from_pydict(
+ {
+ "key": [table.column("key")[0].as_py()],
+ "min": [pc.min(table.column("v")).as_py()],
+ "max": [pc.max(table.column("v")).as_py()],
+ }
)
- cols = {f"col_{i}": sf.col("v") + i for i in range(20)}
- df = df.withColumns(cols)
- def min_max_v(table):
- return pa.Table.from_pydict(
+ expected = (
+ df.groupby("key").agg(sf.min("v").alias("min"),
sf.max("v").alias("max")).sort("key")
+ ).collect()
+
+ int_max = 2147483647
+ for maxRecords, maxBytes in [(1000, int_max), (0, 1048576), (1000,
1048576)]:
+ with self.subTest(maxRecords=maxRecords, maxBytes=maxBytes):
+ with self.sql_conf(
{
- "key": [table.column("key")[0].as_py()],
- "min": [pc.min(table.column("v")).as_py()],
- "max": [pc.max(table.column("v")).as_py()],
+ "spark.sql.execution.arrow.maxRecordsPerBatch":
maxRecords,
+ "spark.sql.execution.arrow.maxBytesPerBatch": maxBytes,
}
- )
+ ):
+ result = (
+ df.groupBy("key")
+ .applyInArrow(min_max_v, "key long, min long, max
long")
+ .sort("key")
+ ).collect()
- result = (
- df.groupBy("key")
- .applyInArrow(min_max_v, "key long, min long, max long")
- .sort("key")
- )
- expected = (
- df.groupby("key")
- .agg(sf.min("v").alias("min"), sf.max("v").alias("max"))
- .sort("key")
- )
- self.assertEqual(expected.collect(), result.collect())
+ self.assertEqual(expected, result)
def test_negative_and_zero_batch_size(self):
for batch_size in [0, -1]:
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index 8756f824a56c..f81c774c0e91 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -947,33 +947,41 @@ class ApplyInPandasTestsMixin:
self.assertEqual(row[1], 123)
def test_arrow_batch_slicing(self):
- with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch":
1000}):
- df = self.spark.range(10000000).select(
- (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
+ df = self.spark.range(10000000).select(
+ (sf.col("id") % 2).alias("key"), sf.col("id").alias("v")
+ )
+ cols = {f"col_{i}": sf.col("v") + i for i in range(20)}
+ df = df.withColumns(cols)
+
+ def min_max_v(pdf):
+ return pd.DataFrame(
+ {
+ "key": [pdf.key.iloc[0]],
+ "min": [pdf.v.min()],
+ "max": [pdf.v.max()],
+ }
)
- cols = {f"col_{i}": sf.col("v") + i for i in range(20)}
- df = df.withColumns(cols)
- def min_max_v(pdf):
- return pd.DataFrame(
+ expected = (
+ df.groupby("key").agg(sf.min("v").alias("min"),
sf.max("v").alias("max")).sort("key")
+ ).collect()
+
+ int_max = 2147483647
+ for maxRecords, maxBytes in [(1000, int_max), (0, 1048576), (1000,
1048576)]:
+ with self.subTest(maxRecords=maxRecords, maxBytes=maxBytes):
+ with self.sql_conf(
{
- "key": [pdf.key.iloc[0]],
- "min": [pdf.v.min()],
- "max": [pdf.v.max()],
+ "spark.sql.execution.arrow.maxRecordsPerBatch":
maxRecords,
+ "spark.sql.execution.arrow.maxBytesPerBatch": maxBytes,
}
- )
-
- result = (
- df.groupBy("key")
- .applyInPandas(min_max_v, "key long, min long, max long")
- .sort("key")
- )
- expected = (
- df.groupby("key")
- .agg(sf.min("v").alias("min"), sf.max("v").alias("max"))
- .sort("key")
- )
- self.assertEqual(expected.collect(), result.collect())
+ ):
+ result = (
+ df.groupBy("key")
+ .applyInPandas(min_max_v, "key long, min long, max
long")
+ .sort("key")
+ ).collect()
+
+ self.assertEqual(expected, result)
def test_negative_and_zero_batch_size(self):
for batch_size in [0, -1]:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]