This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cfe1b399b2fb [SPARK-51559][SQL] Make max broadcast table size
configurable
cfe1b399b2fb is described below
commit cfe1b399b2fb7dfd25c477c4eff3705daac3a154
Author: Chao Sun <[email protected]>
AuthorDate: Wed Mar 19 13:55:11 2025 -0700
[SPARK-51559][SQL] Make max broadcast table size configurable
### What changes were proposed in this pull request?
Currently the maximum size for a table to be broadcast in
`BroadcastExchangeExec` is hard-coded to 8GB, and it seems there is no
particular reason why this value was picked initially. In certain scenarios, we
may want broadcast hash join to be triggered even with larger table sizes,
since otherwise it will fallback to expensive shuffle join.
### Why are the changes needed?
This PR introduces a new configuration: `spark.sql.maxBroadcastTableSize`,
which by default still uses 8GB, so no behavior change expected. This would
allow users to update the config when necessary to be more flexible.
### Does this PR introduce _any_ user-facing change?
No behavior change. Only a new config is added.
### How was this patch tested?
We tested out this internally and it seems to be working well.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #50327 from sunchao/SPARK-51559.
Authored-by: Chao Sun <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
.../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++++
.../spark/sql/execution/exchange/BroadcastExchangeExec.scala | 8 +++-----
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ca7d8ce03793..5e656b6a57c2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1534,6 +1534,12 @@ object SQLConf {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString(s"${5 * 60}")
+ val MAX_BROADCAST_TABLE_SIZE = buildConf("spark.sql.maxBroadcastTableSize")
+ .doc("The maximum table size in bytes that can be broadcast in broadcast
joins.")
+ .version("4.1.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(8L << 30)
+
val INTERRUPT_ON_CANCEL = buildConf("spark.sql.execution.interruptOnCancel")
.doc("When true, all running tasks will be interrupted if one cancels a
query.")
.version("4.0.0")
@@ -6155,6 +6161,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
if (timeoutValue < 0) Long.MaxValue else timeoutValue
}
+ def maxBroadcastTableSizeInBytes: Long = getConf(MAX_BROADCAST_TABLE_SIZE)
+
def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
def convertCTAS: Boolean = getConf(CONVERT_CTAS)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 2565a14cef90..c70ee637a248 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -125,7 +125,6 @@ trait BroadcastExchangeLike extends Exchange {
case class BroadcastExchangeExec(
mode: BroadcastMode,
child: SparkPlan) extends BroadcastExchangeLike {
- import BroadcastExchangeExec._
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
@@ -203,9 +202,10 @@ case class BroadcastExchangeExec(
}
longMetric("dataSize") += dataSize
- if (dataSize >= MAX_BROADCAST_TABLE_BYTES) {
+ val maxBroadcastTableSizeInBytes =
conf.maxBroadcastTableSizeInBytes
+ if (dataSize >= maxBroadcastTableSizeInBytes) {
throw
QueryExecutionErrors.cannotBroadcastTableOverMaxTableBytesError(
- MAX_BROADCAST_TABLE_BYTES, dataSize)
+ maxBroadcastTableSizeInBytes, dataSize)
}
val beforeBroadcast = System.nanoTime()
@@ -268,8 +268,6 @@ case class BroadcastExchangeExec(
}
object BroadcastExchangeExec {
- val MAX_BROADCAST_TABLE_BYTES = 8L << 30
-
private[execution] val executionContext =
ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]