This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8a6b685bf1b6 [SPARK-54853][SQL] Always check
`hive.exec.max.dynamic.partitions` on the spark side
8a6b685bf1b6 is described below
commit 8a6b685bf1b642d72fac2a3d870f4d26bcf22912
Author: Cheng Pan <[email protected]>
AuthorDate: Sun Dec 28 08:43:04 2025 +0800
[SPARK-54853][SQL] Always check `hive.exec.max.dynamic.partitions` on the
spark side
### What changes were proposed in this pull request?
This PR makes `hive.exec.max.dynamic.partitions` check always happens on
the spark side on performing dynamic partition overwrite to Hive SerDe tables,
and skips the hive side check by setting `hive.exec.max.dynamic.partitions` to
`Int.MaxValue` for shared `Hive`'s conf, also assigns the error condition
`_LEGACY_ERROR_TEMP_2277` with a proper name
`DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED`
### Why are the changes needed?
SPARK-37217 partially handles `hive.exec.max.dynamic.partitions` on the
spark side, but only for `INSERT OVERWRITE` case on performing dynamic
partition overwrite to an external Hive SerDe table, which reduces the data
loss risks but still has risks, e.g. when the user updates(especially
increases) the session conf `hive.exec.max.dynamic.partitions`, it only takes
effect on the spark side, the shared `Hive` still uses a static hadoop conf
from `sc.newHadoopConf`, thus if the user hits [...]
Currently, the following three frequently used configs related to dynamic
partition overwrite for Hive SerDe tables have inconsistent behaviors
```
-- this works
SET hive.exec.dynamic.partition=true;
-- this also works
SET hive.exec.dynamic.partition.mode=nonstrict;
-- this does not work, but the error message suggests the user to do that
SET hive.exec.max.dynamic.partitions=1001;
```
```
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Number of
dynamic partitions created is 3, which is more than 2. To solve this try to set
hive.exec.max.dynamic.partitions to at least 3.
at
org.apache.hadoop.hive.ql.metadata.Hive.getValidPartitionsInPath(Hive.java:1862)
at
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1902)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at
org.apache.spark.sql.hive.client.Shim_v2_1.loadDynamicPartitions(HiveShim.scala:1110)
at
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadDynamicPartitions$1(HiveClientImpl.scala:1013)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
at
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:237)
at
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:236)
at
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274)
at
org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:1004)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadDynamicPartitions$1(HiveExternalCatalog.scala:1051)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105)
at
org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:1031)
...
```
### Does this PR introduce _any_ user-facing change?
Yes, with this change, users are allowed to set
`hive.exec.max.dynamic.partitions` in session conf, e.g., by executing `SET
hive.exec.max.dynamic.partitions=1001`, and users will see more consistent
behavior on `hive.exec.max.dynamic.partitions` checks, it always perform checks
before calling external catalog `loadDynamicPartitions`, for both managed and
external table, and both `INSERT INTO` and `INSERT OVERWRITE` dynamic partition
write operation for Hive SerDe tables.
### How was this patch tested?
New UT is added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53624 from pan3793/SPARK-54853.
Lead-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 11 +++---
.../spark/sql/errors/QueryExecutionErrors.scala | 7 ++--
.../spark/sql/hive/client/HiveClientImpl.scala | 2 ++
.../sql/hive/execution/InsertIntoHiveTable.scala | 16 ++++-----
.../spark/sql/hive/HiveSQLInsertTestSuite.scala | 40 +++++++++++++++++++++-
5 files changed, 58 insertions(+), 18 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 9ac7ea2c205d..ac8e891c5403 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1575,6 +1575,12 @@
],
"sqlState" : "42734"
},
+ "DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED" : {
+ "message" : [
+ "Number of dynamic partitions created is <numWrittenParts>, which is
more than <maxDynamicPartitions>. To solve this try to set
<maxDynamicPartitionsKey> to at least <numWrittenParts>."
+ ],
+ "sqlState" : "54054"
+ },
"EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
"message" : [
"Previous node emitted a row with eventTime=<emittedRowEventTime> which
is older than current_watermark_value=<currentWatermark>",
@@ -9409,11 +9415,6 @@
"<message>"
]
},
- "_LEGACY_ERROR_TEMP_2277" : {
- "message" : [
- "Number of dynamic partitions created is <numWrittenParts>, which is
more than <maxDynamicPartitions>. To solve this try to set
<maxDynamicPartitionsKey> to at least <numWrittenParts>."
- ]
- },
"_LEGACY_ERROR_TEMP_2330" : {
"message" : [
"Cannot change nullable column to non-nullable: <fieldName>."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 3f14eaa45cfc..13b948391622 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2484,12 +2484,11 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
maxDynamicPartitions: Int,
maxDynamicPartitionsKey: String): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2277",
+ errorClass = "DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED",
messageParameters = Map(
- "numWrittenParts" -> numWrittenParts.toString(),
+ "numWrittenParts" -> numWrittenParts.toString,
"maxDynamicPartitionsKey" -> maxDynamicPartitionsKey,
- "maxDynamicPartitions" -> maxDynamicPartitions.toString(),
- "numWrittenParts" -> numWrittenParts.toString()),
+ "maxDynamicPartitions" -> maxDynamicPartitions.toString),
cause = null)
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index be46e54d18ec..b13de8b7c6f7 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -1399,6 +1399,8 @@ private[hive] object HiveClientImpl extends Logging {
if ("bonecp".equalsIgnoreCase(cpType)) {
hiveConf.set("datanucleus.connectionPoolingType", "DBCP", SOURCE_SPARK)
}
+ // SPARK-54853 handles this check on the Spark side
+ hiveConf.set("hive.exec.max.dynamic.partitions", Int.MaxValue.toString,
SOURCE_SPARK)
hiveConf
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b5d3fb699d62..355d4aea3191 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -144,15 +144,15 @@ case class InsertIntoHiveTable(
if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
+ val numWrittenParts = writtenParts.size
+ val maxDynamicPartitionsKey =
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+ val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
+ HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
+ if (numWrittenParts > maxDynamicPartitions) {
+ throw
QueryExecutionErrors.writePartitionExceedConfigSizeWhenDynamicPartitionError(
+ numWrittenParts, maxDynamicPartitions, maxDynamicPartitionsKey)
+ }
if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
- val numWrittenParts = writtenParts.size
- val maxDynamicPartitionsKey =
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
- val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
- HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
- if (numWrittenParts > maxDynamicPartitions) {
- throw
QueryExecutionErrors.writePartitionExceedConfigSizeWhenDynamicPartitionError(
- numWrittenParts, maxDynamicPartitions, maxDynamicPartitionsKey)
- }
// SPARK-29295: When insert overwrite to a Hive external table
partition, if the
// partition does not exist, Hive will not check if the external
partition directory
// exists or not before copying files. So if users drop the
partition, and then do
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala
index 4109c0a12706..178c52b469ec 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql.hive
-import org.apache.spark.SparkThrowable
+import org.apache.hadoop.hive.conf.HiveConf
+
+import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.SQLInsertTestSuite
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -48,4 +50,40 @@ class HiveSQLInsertTestSuite extends SQLInsertTestSuite with
TestHiveSingleton {
checkError(exception = exception, sqlState = None, condition =
v1ErrorClass,
parameters = v1Parameters)
}
+
+ test("SPARK-54853: SET hive.exec.max.dynamic.partitions takes effect in
session conf") {
+ withSQLConf(
+ HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE.key -> "false") {
+ val cols = Seq("c1", "p1")
+ val df = sql("SELECT 1, * FROM range(3)")
+ Seq(true, false).foreach { overwrite =>
+ withTable("t1") {
+ createTable("t1", cols, Seq("int", "int"), cols.takeRight(1))
+ assert(spark.table("t1").count() === 0)
+
+ spark.conf.set("hive.exec.max.dynamic.partitions", "3")
+ processInsert("t1", df, overwrite = overwrite)
+ assert(spark.table("t1").count() === 3)
+
+ spark.conf.set("hive.exec.max.dynamic.partitions", "2")
+ checkError(
+ exception = intercept[SparkException] {
+ processInsert("t1", df, overwrite = overwrite)
+ },
+ condition = "DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED",
+ sqlState = Some("54054"),
+ parameters = Map(
+ "numWrittenParts" -> "3",
+ "maxDynamicPartitionsKey" ->
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname,
+ "maxDynamicPartitions" -> "2"))
+ assert(spark.table("t1").count() === 3)
+
+ spark.conf.set("hive.exec.max.dynamic.partitions", "3")
+ processInsert("t1", df, overwrite = overwrite)
+ val expectedRowCount = if (overwrite) 3 else 6
+ assert(spark.table("t1").count() === expectedRowCount)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]