This is an automated email from the ASF dual-hosted git repository.
gengliangwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4f70b6065d2d [SPARK-57199][SQL] Extract the aggregate out-of-memory
error into a QueryExecutionErrors factory
4f70b6065d2d is described below
commit 4f70b6065d2de31b5509ef41035ca8a1236d8953
Author: Gengliang Wang <[email protected]>
AuthorDate: Tue Jun 2 20:57:01 2026 -0700
[SPARK-57199][SQL] Extract the aggregate out-of-memory error into a
QueryExecutionErrors factory
### What changes were proposed in this pull request?
The aggregate out-of-memory error (`AGGREGATE_OUT_OF_MEMORY`) is
constructed inline in two places:
- `HashAggregateExec`, whose whole-stage codegen emits `throw new
<SparkOutOfMemoryError>("AGGREGATE_OUT_OF_MEMORY", new java.util.HashMap());`
into every generated aggregate class.
- `TungstenAggregationIterator` (the interpreted fallback), which throws
the same `new SparkOutOfMemoryError(...)` and needs a `// scalastyle:off
throwerror` suppression.
This PR adds a `QueryExecutionErrors.aggregateOutOfMemoryError()` factory
(next to the existing `cannotAcquireMemory*` OOM factories) and routes both
call sites through it. In the codegen path the emitted Java becomes `throw
QueryExecutionErrors.aggregateOutOfMemoryError();`.
### Why are the changes needed?
Sub-task of SPARK-56908 (reduce generated Java size in whole-stage
codegen). Dumping the whole-stage codegen of the TPC-DS queries shows the
inline `throw new
org.apache.spark.memory.SparkOutOfMemoryError("AGGREGATE_OUT_OF_MEMORY", new
java.util.HashMap());` line **445 times** across 142 of 150 generated classes
-- it is the single most-repeated `throw` in the corpus. Replacing it with a
factory call shrinks each generated aggregate class and moves the error-class
string and the empty [...]
### Does this PR introduce _any_ user-facing change?
No. The same `AGGREGATE_OUT_OF_MEMORY` error with the same (empty) message
parameters is thrown; only where it is constructed changes.
### How was this patch tested?
This is a behavior-preserving refactor, covered by the existing aggregate
suites (e.g. `DataFrameAggregateSuite`, 163 tests, pass). The change was
additionally verified by re-dumping the TPC-DS whole-stage codegen: all 445
inline throws are now `QueryExecutionErrors.aggregateOutOfMemoryError()` calls,
and every generated subtree still compiles (the Janino default imports already
make `QueryExecutionErrors` available unqualified, as used by other generated
error calls such as `divideBy [...]
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8)
Closes #56256 from gengliangwang/spark-agg-oom-factory.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala | 6 ++++++
.../apache/spark/sql/execution/aggregate/HashAggregateExec.scala | 5 +----
.../sql/execution/aggregate/TungstenAggregationIterator.scala | 8 ++------
3 files changed, 9 insertions(+), 10 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 6cbc2eaaabb6..48edb6e38126 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1213,6 +1213,12 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
new java.util.HashMap[String, String]())
}
+ def aggregateOutOfMemoryError(): SparkOutOfMemoryError = {
+ new SparkOutOfMemoryError(
+ "AGGREGATE_OUT_OF_MEMORY",
+ new java.util.HashMap[String, String]())
+ }
+
def cannotAcquireMemoryForWindowAggregateError(
requestedBytes: Long,
receivedBytes: Long): SparkOutOfMemoryError = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 7b3f9ec9951e..e1bfb50634d0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable
import org.apache.spark.TaskContext
import org.apache.spark.internal.LogKeys.CONFIG
-import org.apache.spark.memory.SparkOutOfMemoryError
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -660,8 +659,6 @@ case class HashAggregateExec(
case _ => ("true", "", "")
}
- val oomeClassName = classOf[SparkOutOfMemoryError].getName
-
val findOrInsertRegularHashMap: String =
s"""
|// generate grouping key
@@ -687,7 +684,7 @@ case class HashAggregateExec(
| $unsafeRowKeys, $unsafeRowKeyHash);
| if ($unsafeRowBuffer == null) {
| // failed to allocate the first page
- | throw new $oomeClassName("AGGREGATE_OUT_OF_MEMORY", new
java.util.HashMap());
+ | throw QueryExecutionErrors.aggregateOutOfMemoryError();
| }
|}
""".stripMargin
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 073e5929025b..00d18a2f79a8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -17,16 +17,14 @@
package org.apache.spark.sql.execution.aggregate
-import java.util
-
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.memory.SparkOutOfMemoryError
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap,
UnsafeKVExternalSorter}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.unsafe.KVIterator
@@ -211,9 +209,7 @@ class TungstenAggregationIterator(
buffer = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
if (buffer == null) {
// failed to allocate the first page
- // scalastyle:off throwerror
- throw new SparkOutOfMemoryError("AGGREGATE_OUT_OF_MEMORY", new
util.HashMap())
- // scalastyle:on throwerror
+ throw QueryExecutionErrors.aggregateOutOfMemoryError()
}
}
processRow(buffer, newInput)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]