This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a6b685bf1b6 [SPARK-54853][SQL] Always check 
`hive.exec.max.dynamic.partitions` on the spark side
8a6b685bf1b6 is described below

commit 8a6b685bf1b642d72fac2a3d870f4d26bcf22912
Author: Cheng Pan <[email protected]>
AuthorDate: Sun Dec 28 08:43:04 2025 +0800

    [SPARK-54853][SQL] Always check `hive.exec.max.dynamic.partitions` on the 
spark side
    
    ### What changes were proposed in this pull request?
    
    This PR makes `hive.exec.max.dynamic.partitions` check always happens on 
the spark side on performing dynamic partition overwrite to Hive SerDe tables, 
and skips the hive side check by setting `hive.exec.max.dynamic.partitions` to 
`Int.MaxValue` for shared `Hive`'s conf, also assigns the error condition 
`_LEGACY_ERROR_TEMP_2277` with a proper name 
`DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED`
    
    ### Why are the changes needed?
    
    SPARK-37217 partially handles `hive.exec.max.dynamic.partitions` on the 
spark side, but only for `INSERT OVERWRITE` case on performing dynamic 
partition overwrite to an external Hive SerDe table, which reduces the data 
loss risks but still has risks, e.g. when the user updates(especially 
increases) the session conf `hive.exec.max.dynamic.partitions`, it only takes 
effect on the spark side, the shared `Hive` still uses a static hadoop conf 
from `sc.newHadoopConf`, thus if the user hits [...]
    
    Currently, the following three frequently used configs related to dynamic 
partition overwrite for Hive SerDe tables have inconsistent behaviors
    ```
    -- this works
    SET hive.exec.dynamic.partition=true;
    -- this also works
    SET hive.exec.dynamic.partition.mode=nonstrict;
    -- this does not work, but the error message suggests the user to do that
    SET hive.exec.max.dynamic.partitions=1001;
    ```
    
    ```
    Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Number of 
dynamic partitions created is 3, which is more than 2. To solve this try to set 
hive.exec.max.dynamic.partitions to at least 3.
            at 
org.apache.hadoop.hive.ql.metadata.Hive.getValidPartitionsInPath(Hive.java:1862)
            at 
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1902)
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
            at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.base/java.lang.reflect.Method.invoke(Method.java:569)
            at 
org.apache.spark.sql.hive.client.Shim_v2_1.loadDynamicPartitions(HiveShim.scala:1110)
            at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$loadDynamicPartitions$1(HiveClientImpl.scala:1013)
            at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
            at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
            at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:237)
            at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:236)
            at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274)
            at 
org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:1004)
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$loadDynamicPartitions$1(HiveExternalCatalog.scala:1051)
            at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105)
            at 
org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:1031)
            ...
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, with this change, users are allowed to set 
`hive.exec.max.dynamic.partitions` in session conf, e.g., by executing `SET 
hive.exec.max.dynamic.partitions=1001`, and users will see more consistent 
behavior on `hive.exec.max.dynamic.partitions` checks, it always perform checks 
before calling external catalog `loadDynamicPartitions`, for both managed and 
external table, and both `INSERT INTO` and `INSERT OVERWRITE` dynamic partition 
write operation for Hive SerDe tables.
    
    ### How was this patch tested?
    
    New UT is added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53624 from pan3793/SPARK-54853.
    
    Lead-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json | 11 +++---
 .../spark/sql/errors/QueryExecutionErrors.scala    |  7 ++--
 .../spark/sql/hive/client/HiveClientImpl.scala     |  2 ++
 .../sql/hive/execution/InsertIntoHiveTable.scala   | 16 ++++-----
 .../spark/sql/hive/HiveSQLInsertTestSuite.scala    | 40 +++++++++++++++++++++-
 5 files changed, 58 insertions(+), 18 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 9ac7ea2c205d..ac8e891c5403 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1575,6 +1575,12 @@
     ],
     "sqlState" : "42734"
   },
+  "DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED" : {
+    "message" : [
+      "Number of dynamic partitions created is <numWrittenParts>, which is 
more than <maxDynamicPartitions>. To solve this try to set 
<maxDynamicPartitionsKey> to at least <numWrittenParts>."
+    ],
+    "sqlState" : "54054"
+  },
   "EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
     "message" : [
       "Previous node emitted a row with eventTime=<emittedRowEventTime> which 
is older than current_watermark_value=<currentWatermark>",
@@ -9409,11 +9415,6 @@
       "<message>"
     ]
   },
-  "_LEGACY_ERROR_TEMP_2277" : {
-    "message" : [
-      "Number of dynamic partitions created is <numWrittenParts>, which is 
more than <maxDynamicPartitions>. To solve this try to set 
<maxDynamicPartitionsKey> to at least <numWrittenParts>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2330" : {
     "message" : [
       "Cannot change nullable column to non-nullable: <fieldName>."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 3f14eaa45cfc..13b948391622 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2484,12 +2484,11 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       maxDynamicPartitions: Int,
       maxDynamicPartitionsKey: String): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2277",
+      errorClass = "DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED",
       messageParameters = Map(
-        "numWrittenParts" -> numWrittenParts.toString(),
+        "numWrittenParts" -> numWrittenParts.toString,
         "maxDynamicPartitionsKey" -> maxDynamicPartitionsKey,
-        "maxDynamicPartitions" -> maxDynamicPartitions.toString(),
-        "numWrittenParts" -> numWrittenParts.toString()),
+        "maxDynamicPartitions" -> maxDynamicPartitions.toString),
       cause = null)
   }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index be46e54d18ec..b13de8b7c6f7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -1399,6 +1399,8 @@ private[hive] object HiveClientImpl extends Logging {
     if ("bonecp".equalsIgnoreCase(cpType)) {
       hiveConf.set("datanucleus.connectionPoolingType", "DBCP", SOURCE_SPARK)
     }
+    // SPARK-54853 handles this check on the Spark side
+    hiveConf.set("hive.exec.max.dynamic.partitions", Int.MaxValue.toString, 
SOURCE_SPARK)
     hiveConf
   }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b5d3fb699d62..355d4aea3191 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -144,15 +144,15 @@ case class InsertIntoHiveTable(
 
     if (partition.nonEmpty) {
       if (numDynamicPartitions > 0) {
+        val numWrittenParts = writtenParts.size
+        val maxDynamicPartitionsKey = 
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+        val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
+          HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
+        if (numWrittenParts > maxDynamicPartitions) {
+          throw 
QueryExecutionErrors.writePartitionExceedConfigSizeWhenDynamicPartitionError(
+            numWrittenParts, maxDynamicPartitions, maxDynamicPartitionsKey)
+        }
         if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
-          val numWrittenParts = writtenParts.size
-          val maxDynamicPartitionsKey = 
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
-          val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
-            HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
-          if (numWrittenParts > maxDynamicPartitions) {
-            throw 
QueryExecutionErrors.writePartitionExceedConfigSizeWhenDynamicPartitionError(
-              numWrittenParts, maxDynamicPartitions, maxDynamicPartitionsKey)
-          }
           // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
           // partition does not exist, Hive will not check if the external 
partition directory
           // exists or not before copying files. So if users drop the 
partition, and then do
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala
index 4109c0a12706..178c52b469ec 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSQLInsertTestSuite.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.SparkThrowable
+import org.apache.hadoop.hive.conf.HiveConf
+
+import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql.SQLInsertTestSuite
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
@@ -48,4 +50,40 @@ class HiveSQLInsertTestSuite extends SQLInsertTestSuite with 
TestHiveSingleton {
     checkError(exception = exception, sqlState = None, condition = 
v1ErrorClass,
       parameters = v1Parameters)
   }
+
+  test("SPARK-54853: SET hive.exec.max.dynamic.partitions takes effect in 
session conf") {
+    withSQLConf(
+      HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE.key -> "false") {
+      val cols = Seq("c1", "p1")
+      val df = sql("SELECT 1, * FROM range(3)")
+      Seq(true, false).foreach { overwrite =>
+        withTable("t1") {
+          createTable("t1", cols, Seq("int", "int"), cols.takeRight(1))
+          assert(spark.table("t1").count() === 0)
+
+          spark.conf.set("hive.exec.max.dynamic.partitions", "3")
+          processInsert("t1", df, overwrite = overwrite)
+          assert(spark.table("t1").count() === 3)
+
+          spark.conf.set("hive.exec.max.dynamic.partitions", "2")
+          checkError(
+            exception = intercept[SparkException] {
+              processInsert("t1", df, overwrite = overwrite)
+            },
+            condition = "DYNAMIC_PARTITION_WRITE_PARTITION_NUM_LIMIT_EXCEEDED",
+            sqlState = Some("54054"),
+            parameters = Map(
+              "numWrittenParts" -> "3",
+              "maxDynamicPartitionsKey" -> 
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname,
+              "maxDynamicPartitions" -> "2"))
+          assert(spark.table("t1").count() === 3)
+
+          spark.conf.set("hive.exec.max.dynamic.partitions", "3")
+          processInsert("t1", df, overwrite = overwrite)
+          val expectedRowCount = if (overwrite) 3 else 6
+          assert(spark.table("t1").count() === expectedRowCount)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to