This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 06674bab1db3 [SPARK-53942][SS] Support changing stateless shuffle 
partitions upon restart of streaming query
06674bab1db3 is described below

commit 06674bab1db367fd9fb770fddc95b629f239291a
Author: Jungtaek Lim <[email protected]>
AuthorDate: Sat Oct 25 17:57:36 2025 +0900

    [SPARK-53942][SS] Support changing stateless shuffle partitions upon 
restart of streaming query
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to support changing stateless shuffle partitions upon 
restart of streaming query.
    
    We don't introduce a new config or se - users can simply do the below to 
change the number of shuffle partitions:
    
    * stop the query
    * change the value of `spark.sql.shuffle.partitions`
    * restart the query to take effect
    
    Note that state partitions are still fixed and be unchanged from this. That 
said, the value of `spark.sql.shuffle.partitions` for batch 0 will be the 
number of state partitions and does not change even if the value of the config 
has changed upon restart.
    
    As an implementation detail, this PR adds a new "internal" SQL config 
`spark.sql.streaming.internal.stateStore.partitions`  to distinguish stateless 
shuffle partitions vs stateful shuffle partitions. Unlike other internal 
configs where we still expect someone (admin?) to use them, this config is NOT 
meant to be an user facing one and no one should set this up directly. We add 
this config to implement trick for compatibility, nothing else. We don't 
support compatibility of this config  [...]
    
    That said, the value of the new config is expected to be inherited from 
`spark.sql.shuffle.partitions` assuming no one will set this up directly.
    
    To support compatibility, we employ a trick into offset log - for stateful 
shuffle partitions, we refer it to 
`spark.sql.streaming.internal.stateStore.partitions` in session config, and we 
keep using `spark.sql.shuffle.partitions` in offset log. We handle rebinding 
between two configs to leave the persistent layer unchanged. This way we can 
support the query to be both upgraded/downgraded.
    
    ### Why are the changes needed?
    
    Whenever there is need to change the parallelism of the processing e.g. 
input volume being changed over time, the size of static table changed over 
time, skew in stream-static join (though AQE may help resolving this a bit), 
the only official approach to deal with this was to discard checkpoint and 
start a new query, implying they have to do full backfill. (For workloads with 
FEB sink, advanced (and adventurous) users could change the config in their 
user function, but that's arguably [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, user can change shuffle partitions for stateless operators upon 
restart, via changing the config `spark.sql.shuffle.partitions`.
    
    ### How was this patch tested?
    
    New UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52645 from 
HeartSaVioR/WIP-change-stateless-shuffle-partitions-in-streaming-query.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  20 ++-
 .../streaming/checkpointing/OffsetSeq.scala        |  90 ++++++++---
 .../streaming/runtime/IncrementalExecution.scala   |   3 +-
 .../streaming/runtime/StreamExecution.scala        |  12 +-
 .../commits/0                                      |   2 +
 .../metadata                                       |   1 +
 .../offsets/0                                      |   3 +
 .../state/0/0/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/0/_metadata/schema                     | Bin 0 -> 265 bytes
 .../state/0/1/1.delta                              | Bin 0 -> 87 bytes
 .../state/0/2/1.delta                              | Bin 0 -> 75 bytes
 .../state/0/3/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/4/1.delta                              | Bin 0 -> 75 bytes
 .../state/0/_metadata/metadata                     |   2 +
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 179 +++++++++++++++++++++
 15 files changed, 289 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d4ba066b2730..d29100851e57 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -877,9 +877,7 @@ object SQLConf {
     .createOptional
 
   val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
-    .doc("The default number of partitions to use when shuffling data for 
joins or aggregations. " +
-      "Note: For structured streaming, this configuration cannot be changed 
between query " +
-      "restarts from the same checkpoint location.")
+    .doc("The default number of partitions to use when shuffling data for 
joins or aggregations.")
     .version("1.1.0")
     .intConf
     .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be 
positive")
@@ -2627,6 +2625,22 @@ object SQLConf {
       .checkValue(k => k >= 0, "Must be greater than or equal to 0")
       .createWithDefault(5)
 
+  val STATEFUL_SHUFFLE_PARTITIONS_INTERNAL =
+    buildConf("spark.sql.streaming.internal.stateStore.partitions")
+      .doc("WARN: This config is used internally and is not intended to be 
user-facing. This " +
+        "config can be removed without support of compatibility in any time. " 
+
+        "DO NOT USE THIS CONFIG DIRECTLY AND USE THE CONFIG 
`spark.sql.shuffle.partitions`. " +
+        "The default number of partitions to use when shuffling data for 
stateful operations. " +
+        "If not specified, this config picks up the value of 
`spark.sql.shuffle.partitions`. " +
+        "Note: For structured streaming, this configuration cannot be changed 
between query " +
+        "restarts from the same checkpoint location.")
+      .internal()
+      .intConf
+      .checkValue(_ > 0,
+        "The value of spark.sql.streaming.internal.stateStore.partitions must 
be a positive " +
+          "integer.")
+      .createOptional
+
   val FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION =
     buildConf("spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion")
       .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
index c1c3c379719a..62c903cb689a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.execution.streaming.checkpointing
 
+import scala.language.existentials
+
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.{CONFIG, DEFAULT_VALUE, NEW_VALUE, 
OLD_VALUE, TIP}
+import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
SparkDataStream}
@@ -85,6 +88,11 @@ object OffsetSeq {
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
  * @param conf: Additional conf_s to be persisted across batches, e.g. number 
of shuffle partitions.
+ * CAVEAT: This does not apply the logic we handle in [[OffsetSeqMetadata]] 
object, e.g.
+ * deducing the default value of SQL config if the entry does not exist in the 
offset log,
+ * resolving the re-bind of config key (the config key in offset log is not 
same with the
+ * actual key in session), etc. If you need to get the value with applying the 
logic, use
+ * [[OffsetSeqMetadata.readValue()]].
  */
 case class OffsetSeqMetadata(
     batchWatermarkMs: Long = 0,
@@ -101,13 +109,35 @@ object OffsetSeqMetadata extends Logging {
    * log in the checkpoint position.
    */
   private val relevantSQLConfs = Seq(
-    SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS, 
STREAMING_MULTIPLE_WATERMARK_POLICY,
+    STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
     FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, 
STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
     STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
     STATE_STORE_ROCKSDB_FORMAT_VERSION, 
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
     PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, 
STREAMING_STATE_STORE_ENCODING_FORMAT
   )
 
+  /**
+   * This is an extension of `relevantSQLConfs`. The characteristic is the 
same, but we persist the
+   * value of config A as config B in offset log. This exists for 
compatibility purpose e.g. if
+   * user upgrades Spark and runs a streaming query, but has to downgrade due 
to some issues.
+   *
+   * A config should be only bound to either `relevantSQLConfs` or 
`rebindSQLConfs` (key or value).
+   */
+  private val rebindSQLConfsSessionToOffsetLog: Map[ConfigEntry[_], 
ConfigEntry[_]] = {
+    Map(
+      // TODO: The proper way to handle this is to make the number of 
partitions in the state
+      //  metadata as the source of truth, but it requires major changes if we 
want to take care
+      //  of compatibility.
+      STATEFUL_SHUFFLE_PARTITIONS_INTERNAL -> SHUFFLE_PARTITIONS
+    )
+  }
+
+  /**
+   * Reversed index of `rebindSQLConfsSessionToOffsetLog`.
+   */
+  private val rebindSQLConfsOffsetLogToSession: Map[ConfigEntry[_], 
ConfigEntry[_]] =
+    rebindSQLConfsSessionToOffsetLog.map { case (k, v) => (v, k) }.toMap
+
   /**
    * Default values of relevant configurations that are used for backward 
compatibility.
    * As new configurations are added to the metadata, existing checkpoints may 
not have those
@@ -132,6 +162,20 @@ object OffsetSeqMetadata extends Logging {
     STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow"
   )
 
+  def readValue[T](metadataLog: OffsetSeqMetadata, confKey: ConfigEntry[T]): 
String = {
+    readValueOpt(metadataLog, confKey).getOrElse(confKey.defaultValueString)
+  }
+
+  def readValueOpt[T](
+      metadataLog: OffsetSeqMetadata,
+      confKey: ConfigEntry[T]): Option[String] = {
+    val actualKey = if (rebindSQLConfsSessionToOffsetLog.contains(confKey)) {
+      rebindSQLConfsSessionToOffsetLog(confKey)
+    } else confKey
+
+    
metadataLog.conf.get(actualKey.key).orElse(relevantSQLConfDefaultValues.get(actualKey.key))
+  }
+
   def apply(json: String): OffsetSeqMetadata = 
Serialization.read[OffsetSeqMetadata](json)
 
   def apply(
@@ -139,49 +183,59 @@ object OffsetSeqMetadata extends Logging {
       batchTimestampMs: Long,
       sessionConf: RuntimeConfig): OffsetSeqMetadata = {
     val confs = relevantSQLConfs.map { conf => conf.key -> 
sessionConf.get(conf.key) }.toMap
-    OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs)
+    val confsFromRebind = rebindSQLConfsSessionToOffsetLog.map {
+      case (confInSession, confInOffsetLog) =>
+        confInOffsetLog.key -> sessionConf.get(confInSession.key)
+    }.toMap
+    OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs++ 
confsFromRebind)
   }
 
   /** Set the SparkSession configuration with the values in the metadata */
   def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: SQLConf): Unit 
= {
-    val configs = sessionConf.getAllConfs
-    OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
-
-      metadata.conf.get(confKey) match {
+    def setOneSessionConf(confKeyInOffsetLog: String, confKeyInSession: 
String): Unit = {
+      metadata.conf.get(confKeyInOffsetLog) match {
 
         case Some(valueInMetadata) =>
           // Config value exists in the metadata, update the session config 
with this value
-          val optionalValueInSession = sessionConf.getConfString(confKey, null)
+          val optionalValueInSession = 
sessionConf.getConfString(confKeyInSession, null)
           if (optionalValueInSession != null && optionalValueInSession != 
valueInMetadata) {
-            logWarning(log"Updating the value of conf '${MDC(CONFIG, 
confKey)}' in current " +
-              log"session from '${MDC(OLD_VALUE, optionalValueInSession)}' " +
+            logWarning(log"Updating the value of conf '${MDC(CONFIG, 
confKeyInSession)}' in " +
+              log"current session from '${MDC(OLD_VALUE, 
optionalValueInSession)}' " +
               log"to '${MDC(NEW_VALUE, valueInMetadata)}'.")
           }
-          sessionConf.setConfString(confKey, valueInMetadata)
+          sessionConf.setConfString(confKeyInSession, valueInMetadata)
 
         case None =>
           // For backward compatibility, if a config was not recorded in the 
offset log,
           // then either inject a default value (if specified in 
`relevantSQLConfDefaultValues`) or
           // let the existing conf value in SparkSession prevail.
-          relevantSQLConfDefaultValues.get(confKey) match {
+          relevantSQLConfDefaultValues.get(confKeyInOffsetLog) match {
 
             case Some(defaultValue) =>
-              sessionConf.setConfString(confKey, defaultValue)
-              logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in 
the offset log, " +
-                log"using default value '${MDC(DEFAULT_VALUE, defaultValue)}'")
+              sessionConf.setConfString(confKeyInSession, defaultValue)
+              logWarning(log"Conf '${MDC(CONFIG, confKeyInSession)}' was not 
found in the offset " +
+                log"log, using default value '${MDC(DEFAULT_VALUE, 
defaultValue)}'")
 
             case None =>
-              val value = sessionConf.getConfString(confKey, null)
+              val value = sessionConf.getConfString(confKeyInSession, null)
               val valueStr = if (value != null) {
                 s" Using existing session conf value '$value'."
               } else {
                 " No value set in session conf."
               }
-              logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in 
the offset log. " +
-                log"${MDC(TIP, valueStr)}")
-
+              logWarning(log"Conf '${MDC(CONFIG, confKeyInSession)}' was not 
found in the " +
+                log"offset log. ${MDC(TIP, valueStr)}")
           }
       }
     }
+
+    OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
+      setOneSessionConf(confKey, confKey)
+    }
+
+    rebindSQLConfsOffsetLogToSession.foreach {
+      case (confInOffsetLog, confInSession) =>
+        setOneSessionConf(confInOffsetLog.key, confInSession.key)
+    }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
index 4f41e8a8be06..cf0c297efbf0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
@@ -105,7 +105,8 @@ class IncrementalExecution(
 
   private lazy val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
-  private[sql] val numStateStores = 
offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key)
+  private[sql] val numStateStores = 
OffsetSeqMetadata.readValueOpt(offsetSeqMetadata,
+      SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL)
     .map(SQLConf.SHUFFLE_PARTITIONS.valueConverter)
     .getOrElse(sparkSession.sessionState.conf.numShufflePartitions)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
index b5e51bc8b54d..56ed0de1fcdc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
@@ -155,7 +155,7 @@ abstract class StreamExecution(
   protected def sources: Seq[SparkDataStream]
 
   /** Isolated spark session to run the batches with. */
-  protected val sparkSessionForStream: SparkSession = 
sparkSession.cloneSession()
+  protected[sql] val sparkSessionForStream: SparkSession = 
sparkSession.cloneSession()
 
   /**
    * Manages the metadata from this checkpoint location.
@@ -320,6 +320,16 @@ abstract class StreamExecution(
           
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
         }
 
+        
sparkSessionForStream.conf.get(SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL) 
match {
+          case Some(_) => // no-op
+          case None =>
+            // Take the default value of `spark.sql.shuffle.partitions`.
+            val shufflePartitionValue = 
sparkSessionForStream.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+            sparkSessionForStream.conf.set(
+              SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL.key,
+              shufflePartitionValue)
+        }
+
         getLatestExecutionContext().updateStatusMessage("Initializing sources")
         // force initialization of the logical plan so that the sources can be 
created
         logicalPlan
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/commits/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/commits/0
new file mode 100644
index 000000000000..9c1e3021c3ea
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/metadata
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/metadata
new file mode 100644
index 000000000000..53635b08905f
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/metadata
@@ -0,0 +1 @@
+{"id":"295ee44f-dd99-45cf-a21d-9a760b439c45"}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/offsets/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/offsets/0
new file mode 100644
index 000000000000..dcacc16cdc23
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1760948082021,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.stateStore.encodingFormat":"unsaferow","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"mi
 [...]
+0
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/_metadata/schema
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/_metadata/schema
new file mode 100644
index 000000000000..864aad4c83e5
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/0/_metadata/schema
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/1/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/1/1.delta
new file mode 100644
index 000000000000..5acc883909ca
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/1/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/2/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/2/1.delta
new file mode 100644
index 000000000000..00c03b0f2aaa
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/2/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/3/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/3/1.delta
new file mode 100644
index 000000000000..635297805184
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/3/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/4/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/4/1.delta
new file mode 100644
index 000000000000..0a0f74c94403
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/4/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/_metadata/metadata
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/_metadata/metadata
new file mode 100644
index 000000000000..39ce28c9b4aa
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/state/0/_metadata/metadata
@@ -0,0 +1,2 @@
+v1
+{"operatorInfo":{"operatorId":0,"operatorName":"dedupe"},"stateStoreInfo":[{"storeName":"default","numColsPrefixKey":0,"numPartitions":5}]}
\ No newline at end of file
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index b0de21a6d9e8..7adf98b79204 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset 
=> OffsetV2, ReadLi
 import org.apache.spark.sql.execution.exchange.{REQUIRED_BY_STATEFUL_OPERATOR, 
ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, 
OffsetSeqMetadata}
+import 
org.apache.spark.sql.execution.streaming.operators.stateful.StateStoreSaveExec
 import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, 
MemoryStream, MetricsReporter, StreamExecution, StreamingExecutionRelation, 
StreamingQueryWrapper}
 import org.apache.spark.sql.execution.streaming.sources.{MemorySink, 
TestForeachWriter}
 import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
RocksDBStateStoreProvider, StateStoreCheckpointLocationNotEmpty}
@@ -1479,6 +1480,184 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  test("SPARK-53942: changing the number of stateful shuffle partitions via 
config") {
+    val stream = MemoryStream[Int]
+
+    val df = stream.toDF()
+      .groupBy("value")
+      .count()
+
+    withTempDir { checkpointDir =>
+      withSQLConf(SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL.key -> 
10.toString) {
+        assert(
+          spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+            != spark.conf.get(SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL).get
+        )
+
+        testStream(df, OutputMode.Update())(
+          StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+          AddData(stream, 1, 2, 3),
+          ProcessAllAvailable(),
+          AssertOnQuery { q =>
+            // This also proves the path of downgrade; we use the same entry 
name to persist the
+            // stateful shuffle partitions, hence it is compatible with older 
Spark versions.
+            assert(
+              q.offsetLog.offsetSeqMetadataForBatchId(0).get.conf
+                .get(SQLConf.SHUFFLE_PARTITIONS.key) === Some("10"))
+
+            val stateOps = q.lastExecution.executedPlan.collect {
+              case s: StateStoreSaveExec => s
+            }
+
+            val stateStoreSave = stateOps.head
+            assert(stateStoreSave.stateInfo.get.numPartitions === 10)
+            true
+          }
+        )
+      }
+
+      // Trying to change the number of stateful shuffle partitions, which 
should be ignored.
+      withSQLConf(SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL.key -> 
3.toString) {
+        testStream(df, OutputMode.Update())(
+          StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+          AddData(stream, 4, 5),
+          ProcessAllAvailable(),
+          Execute { q =>
+            assert(
+              q.offsetLog.offsetSeqMetadataForBatchId(1).get.conf
+                .get(SQLConf.SHUFFLE_PARTITIONS.key) === Some("10"))
+
+            val stateOps = q.lastExecution.executedPlan.collect {
+              case s: StateStoreSaveExec => s
+            }
+
+            val stateStoreSave = stateOps.head
+            // This shouldn't change to 3.
+            assert(stateStoreSave.stateInfo.get.numPartitions === 10)
+          }
+        )
+      }
+    }
+  }
+
+  test("SPARK-53942: changing the number of stateless shuffle partitions via 
config") {
+    val inputData = MemoryStream[(String, Int)]
+    val dfStream = inputData.toDF()
+      .select($"_1".as("key"), $"_2".as("value"))
+
+    val dfBatch = spark.createDataFrame(Seq(("a", "aux1"), ("b", "aux2"), 
("c", "aux3")))
+      .toDF("key", "aux")
+
+    val joined = dfStream.join(dfBatch, "key")
+
+    withTempDir { checkpointDir =>
+      withSQLConf(
+        SQLConf.SHUFFLE_PARTITIONS.key -> 1.toString,
+        // We should disable AQE to have deterministic number of shuffle 
partitions.
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+        // Also disable broadcast hash join.
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        testStream(joined)(
+          StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+          AddData(inputData, ("a", 1), ("b", 2)),
+          ProcessAllAvailable(),
+          Execute { q =>
+            // The value of stateful shuffle partitions in offset log follows 
the
+            // stateless shuffle partitions if it's not specified explicitly.
+            assert(
+              q.sparkSessionForStream.conf.get(
+                SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL) === Some(1))
+            assert(
+              q.offsetLog.offsetSeqMetadataForBatchId(0).get.conf
+                .get(SQLConf.SHUFFLE_PARTITIONS.key) === Some("1"))
+
+            val shuffles = q.lastExecution.executedPlan.collect {
+              case s: ShuffleExchangeExec => s
+            }
+
+            val shuffle = shuffles.head
+            assert(shuffle.numPartitions === 1)
+          }
+        )
+      }
+
+      // Trying to change the number of stateless shuffle partitions, which 
should be honored.
+      withSQLConf(
+        SQLConf.SHUFFLE_PARTITIONS.key -> 5.toString,
+        // We should disable AQE to have deterministic number of shuffle 
partitions.
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+        // Also disable broadcast hash join.
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        testStream(joined)(
+          StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+          AddData(inputData, ("c", 3)),
+          ProcessAllAvailable(),
+          Execute { q =>
+            // Changing the number of stateless shuffle partitions should not 
change the number
+            // of stateful shuffle partitions if it's available in offset log.
+            assert(
+              q.sparkSessionForStream.conf.get(
+                SQLConf.STATEFUL_SHUFFLE_PARTITIONS_INTERNAL) === Some(1))
+            assert(
+              q.offsetLog.offsetSeqMetadataForBatchId(1).get.conf
+                .get(SQLConf.SHUFFLE_PARTITIONS.key) === Some("1"))
+
+            val shuffles = q.lastExecution.executedPlan.collect {
+              case s: ShuffleExchangeExec => s
+            }
+
+            val shuffle = shuffles.head
+            assert(shuffle.numPartitions === 5)
+          }
+        )
+      }
+    }
+  }
+
+  test("SPARK-53942: stateful shuffle partitions are retained from old 
checkpoint") {
+    val input = MemoryStream[Int]
+    val df1 = input.toDF()
+      .select($"value" as Symbol("key1"), $"value" * 2 as Symbol("key2"),
+        $"value" * 3 as Symbol("value"))
+    val dedup = df1.dropDuplicates("key1", "key2")
+
+    val resourceUri = this.getClass.getResource(
+      
"/structured-streaming/checkpoint-version-4.0.1-dedup-spark-53942/").toURI
+
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+    Utils.copyDirectory(new File(resourceUri), checkpointDir)
+
+    input.addData(1, 1, 2, 3, 4)
+
+    // Trying to change the number of stateless shuffle partitions, which 
should be no op
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+      testStream(dedup)(
+        // scalastyle:off line.size.limit
+        /*
+          Note: The checkpoint was generated using the following input in 
Spark version 4.0.1, with
+          shuffle partitions = 5
+          AddData(inputData, 1, 1, 2, 3, 4),
+          CheckAnswer((1, 2, 3), (2, 4, 6), (3, 6, 9), (4, 8, 12))
+         */
+        // scalastyle:on line.size.limit
+        StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+        AddData(input, 2, 3, 3, 4, 5),
+        CheckAnswer((5, 10, 15)),
+        Execute { q =>
+          val shuffles = q.lastExecution.executedPlan.collect {
+            case s: ShuffleExchangeExec => s
+          }
+
+          val shuffle = shuffles.head
+          assert(shuffle.numPartitions === 5)
+        },
+        StopStream
+      )
+    }
+  }
+
   private val TEST_PROVIDERS = Seq(
     classOf[HDFSBackedStateStoreProvider].getName,
     classOf[RocksDBStateStoreProvider].getName


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to