This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 59e0066dc78 [SPARK-42779][SQL] Allow V2 writes to indicate advisory 
shuffle partition size
59e0066dc78 is described below

commit 59e0066dc78c6300550bde90770e3ba929ea0741
Author: aokolnychyi <[email protected]>
AuthorDate: Fri Mar 17 13:10:22 2023 -0700

    [SPARK-42779][SQL] Allow V2 writes to indicate advisory shuffle partition 
size
    
    ### What changes were proposed in this pull request?
    
    This PR adds an API for data sources to indicate the advisory partition 
size for V2 writes.
    
    ### Why are the changes needed?
    
    Data sources have an API to request a particular distribution and ordering 
of data for V2 writes. If AQE is enabled, the default session advisory 
partition size (64MB) will be used as target. Unfortunately, this default value 
is still suboptimal and can lead to small files because the written data can be 
compressed nicely using columnar file formats. Spark should allow data sources 
to indicate the advisory shuffle partition size, just like it lets data sources 
request a particular num [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. However, the changes are backward compatible.
    
    ### How was this patch tested?
    
    This PR extends the existing tests for V2 write distribution and ordering.
    
    Closes #40421 from aokolnychyi/spark-42779.
    
    Lead-authored-by: aokolnychyi <[email protected]>
    Co-authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 core/src/main/resources/error/error-classes.json   |  27 ++++-
 .../write/RequiresDistributionAndOrdering.java     |  23 +++-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   2 +-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |   9 +-
 .../plans/logical/basicLogicalOperators.scala      |  14 ++-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  14 ++-
 .../sql/connector/catalog/InMemoryBaseTable.scala  |   5 +
 .../sql/connector/catalog/InMemoryTable.scala      |   4 +-
 .../connector/catalog/InMemoryTableCatalog.scala   |   6 +-
 .../spark/sql/execution/SparkStrategies.scala      |   8 +-
 .../spark/sql/execution/adaptive/AQEUtils.scala    |   2 +-
 .../adaptive/CoalesceShufflePartitions.scala       |  30 +++--
 .../OptimizeSkewInRebalancePartitions.scala        |   3 +-
 .../sql/execution/adaptive/QueryStageExec.scala    |   2 +
 .../v2/DistributionAndOrderingUtils.scala          |  13 ++-
 .../execution/exchange/EnsureRequirements.scala    |   5 +-
 .../execution/exchange/ShuffleExchangeExec.scala   |   8 +-
 .../org/apache/spark/sql/CTEInlineSuite.scala      |   2 +-
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  |   2 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |   2 +-
 .../spark/sql/SparkSessionExtensionSuite.scala     |   1 +
 .../connector/KeyGroupedPartitioningSuite.scala    |   2 +-
 .../WriteDistributionAndOrderingSuite.scala        | 127 ++++++++++++++++-----
 .../apache/spark/sql/execution/PlannerSuite.scala  |   4 +-
 .../exchange/EnsureRequirementsSuite.scala         |  90 +++++++--------
 .../spark/sql/streaming/StreamingJoinSuite.scala   |   4 +-
 26 files changed, 290 insertions(+), 119 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 34026083bb9..04337478223 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1063,6 +1063,28 @@
     ],
     "sqlState" : "42903"
   },
+  "INVALID_WRITE_DISTRIBUTION" : {
+    "message" : [
+      "The requested write distribution is invalid."
+    ],
+    "subClass" : {
+      "PARTITION_NUM_AND_SIZE" : {
+        "message" : [
+          "The partition number and advisory partition size can't be specified 
at the same time."
+        ]
+      },
+      "PARTITION_NUM_WITH_UNSPECIFIED_DISTRIBUTION" : {
+        "message" : [
+          "The number of partitions can't be specified with unspecified 
distribution."
+        ]
+      },
+      "PARTITION_SIZE_WITH_UNSPECIFIED_DISTRIBUTION" : {
+        "message" : [
+          "The advisory partition size can't be specified with unspecified 
distribution."
+        ]
+      }
+    }
+  },
   "LOCATION_ALREADY_EXISTS" : {
     "message" : [
       "Cannot name the managed table as <identifier>, as its associated 
location <location> already exists. Please pick a different table name, or 
remove the existing location first."
@@ -2931,11 +2953,6 @@
       "Unsupported data type <dataType>."
     ]
   },
-  "_LEGACY_ERROR_TEMP_1178" : {
-    "message" : [
-      "The number of partitions can't be specified with unspecified 
distribution. Invalid writer requirements detected."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1181" : {
     "message" : [
       "Stream-stream join without equality predicate is not supported."
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
index a3d08338c7a..2adfe75f7d8 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
@@ -66,12 +66,33 @@ public interface RequiresDistributionAndOrdering extends 
Write {
    * <p>
    * Note that Spark doesn't support the number of partitions on {@link 
UnspecifiedDistribution},
    * the query will fail if the number of partitions are provided but the 
distribution is
-   * unspecified.
+   * unspecified. Data sources may either request a particular number of 
partitions or
+   * a preferred partition size via {@link #advisoryPartitionSizeInBytes}, not 
both.
    *
    * @return the required number of partitions, any value less than 1 mean no 
requirement.
    */
   default int requiredNumPartitions() { return 0; }
 
+  /**
+   * Returns the advisory (not guaranteed) shuffle partition size in bytes for 
this write.
+   * <p>
+   * Implementations may override this to indicate the preferable partition 
size in shuffles
+   * performed to satisfy the requested distribution. Note that Spark doesn't 
support setting
+   * the advisory partition size for {@link UnspecifiedDistribution}, the 
query will fail if
+   * the advisory partition size is set but the distribution is unspecified. 
Data sources may
+   * either request a particular number of partitions via {@link 
#requiredNumPartitions()} or
+   * a preferred partition size, not both.
+   * <p>
+   * Data sources should be careful with large advisory sizes as it will 
impact the writing
+   * parallelism and may degrade the overall job performance.
+   * <p>
+   * Note this value only acts like a guidance and Spark does not guarantee 
the actual and advisory
+   * shuffle partition sizes will match. Ignored if the adaptive execution is 
disabled.
+   *
+   * @return the advisory partition size, any value less than 1 means no 
preference.
+   */
+  default long advisoryPartitionSizeInBytes() { return 0; }
+
   /**
    * Returns the ordering required by this write.
    * <p>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ddd26c2efe1..d069d639a4a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1743,7 +1743,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       // table `t` even if there is a Project node between the table scan node 
and Sort node.
       // We also need to propagate the missing attributes from the descendant 
node to the current
       // node, and project them way at the end via an extra Project.
-      case r @ RepartitionByExpression(partitionExprs, child, _)
+      case r @ RepartitionByExpression(partitionExprs, child, _, _)
         if !r.resolved || r.missingInput.nonEmpty =>
         val resolvedNoOuter = 
partitionExprs.map(resolveExpressionByPlanChildren(_, r))
         val (newPartitionExprs, newChild) = 
resolveExprsAndAddMissingAttrs(resolvedNoOuter, child)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a0d49c29470..c628fa0f46b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1202,15 +1202,16 @@ object CollapseRepartition extends Rule[LogicalPlan] {
     }
     // Case 2: When a RepartitionByExpression has a child of global Sort, 
Repartition or
     // RepartitionByExpression we can remove the child.
-    case r @ RepartitionByExpression(_, child @ (Sort(_, true, _) | _: 
RepartitionOperation), _) =>
+    case r @ RepartitionByExpression(
+        _, child @ (Sort(_, true, _) | _: RepartitionOperation), _, _) =>
       r.withNewChildren(child.children)
     // Case 3: When a RebalancePartitions has a child of local or global Sort, 
Repartition or
     // RepartitionByExpression we can remove the child.
-    case r @ RebalancePartitions(_, child @ (_: Sort | _: 
RepartitionOperation), _) =>
+    case r @ RebalancePartitions(_, child @ (_: Sort | _: 
RepartitionOperation), _, _) =>
       r.withNewChildren(child.children)
     // Case 4: When a RebalancePartitions has a child of RebalancePartitions 
we can remove the
     // child.
-    case r @ RebalancePartitions(_, child: RebalancePartitions, _) =>
+    case r @ RebalancePartitions(_, child: RebalancePartitions, _, _) =>
       r.withNewChildren(child.children)
   }
 }
@@ -1222,7 +1223,7 @@ object CollapseRepartition extends Rule[LogicalPlan] {
 object OptimizeRepartition extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformWithPruning(
     _.containsPattern(REPARTITION_OPERATION), ruleId) {
-    case r @ RepartitionByExpression(partitionExpressions, _, numPartitions)
+    case r @ RepartitionByExpression(partitionExpressions, _, numPartitions, _)
       if partitionExpressions.nonEmpty && 
partitionExpressions.forall(_.foldable) &&
         numPartitions.isEmpty =>
       r.copy(optNumPartitions = Some(1))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 26be405601a..cdf48dd265f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1790,6 +1790,8 @@ trait HasPartitionExpressions extends SQLConfHelper {
 
   def optNumPartitions: Option[Int]
 
+  def optAdvisoryPartitionSize: Option[Long]
+
   protected def partitioning: Partitioning = if (partitionExpressions.isEmpty) 
{
     RoundRobinPartitioning(numPartitions)
   } else {
@@ -1820,7 +1822,11 @@ trait HasPartitionExpressions extends SQLConfHelper {
 case class RepartitionByExpression(
     partitionExpressions: Seq[Expression],
     child: LogicalPlan,
-    optNumPartitions: Option[Int]) extends RepartitionOperation with 
HasPartitionExpressions {
+    optNumPartitions: Option[Int],
+    optAdvisoryPartitionSize: Option[Long] = None)
+  extends RepartitionOperation with HasPartitionExpressions {
+
+  require(optNumPartitions.isEmpty || optAdvisoryPartitionSize.isEmpty)
 
   override val partitioning: Partitioning = {
     if (numPartitions == 1) {
@@ -1857,7 +1863,11 @@ object RepartitionByExpression {
 case class RebalancePartitions(
     partitionExpressions: Seq[Expression],
     child: LogicalPlan,
-    optNumPartitions: Option[Int] = None) extends UnaryNode with 
HasPartitionExpressions {
+    optNumPartitions: Option[Int] = None,
+    optAdvisoryPartitionSize: Option[Long] = None) extends UnaryNode with 
HasPartitionExpressions {
+
+  require(optNumPartitions.isEmpty || optAdvisoryPartitionSize.isEmpty)
+
   override def maxRows: Option[Long] = child.maxRows
   override def output: Seq[Attribute] = child.output
   override val nodePatterns: Seq[TreePattern] = Seq(REBALANCE_PARTITIONS)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 497a7be1420..f58f9a92928 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1803,7 +1803,19 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
 
   def numberOfPartitionsNotAllowedWithUnspecifiedDistributionError(): 
Throwable = {
     new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1178",
+      errorClass = 
"INVALID_WRITE_DISTRIBUTION.PARTITION_NUM_WITH_UNSPECIFIED_DISTRIBUTION",
+      messageParameters = Map.empty)
+  }
+
+  def partitionSizeNotAllowedWithUnspecifiedDistributionError(): Throwable = {
+    new AnalysisException(
+      errorClass = 
"INVALID_WRITE_DISTRIBUTION.PARTITION_SIZE_WITH_UNSPECIFIED_DISTRIBUTION",
+      messageParameters = Map.empty)
+  }
+
+  def numberAndSizeOfPartitionsNotAllowedTogether(): Throwable = {
+    new AnalysisException(
+      errorClass = "INVALID_WRITE_DISTRIBUTION.PARTITION_NUM_AND_SIZE",
       messageParameters = Map.empty)
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index cd7d80a8296..4c62ca35b1e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -55,6 +55,7 @@ abstract class InMemoryBaseTable(
     val distribution: Distribution = Distributions.unspecified(),
     val ordering: Array[SortOrder] = Array.empty,
     val numPartitions: Option[Int] = None,
+    val advisoryPartitionSize: Option[Long] = None,
     val isDistributionStrictlyRequired: Boolean = true,
     val numRowsPerSplit: Int = Int.MaxValue)
   extends Table with SupportsRead with SupportsWrite with 
SupportsMetadataColumns {
@@ -450,6 +451,10 @@ abstract class InMemoryBaseTable(
         numPartitions.getOrElse(0)
       }
 
+      override def advisoryPartitionSizeInBytes(): Long = {
+        advisoryPartitionSize.getOrElse(0)
+      }
+
       override def toBatch: BatchWrite = writer
 
       override def toStreaming: StreamingWrite = streamingWriter match {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index 318248dae05..d71bf1aeecd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -39,10 +39,12 @@ class InMemoryTable(
     distribution: Distribution = Distributions.unspecified(),
     ordering: Array[SortOrder] = Array.empty,
     numPartitions: Option[Int] = None,
+    advisoryPartitionSize: Option[Long] = None,
     isDistributionStrictlyRequired: Boolean = true,
     override val numRowsPerSplit: Int = Int.MaxValue)
   extends InMemoryBaseTable(name, schema, partitioning, properties, 
distribution,
-    ordering, numPartitions, isDistributionStrictlyRequired, numRowsPerSplit) 
with SupportsDelete {
+    ordering, numPartitions, advisoryPartitionSize, 
isDistributionStrictlyRequired,
+    numRowsPerSplit) with SupportsDelete {
 
   override def canDeleteWhere(filters: Array[Filter]): Boolean = {
     InMemoryTable.supportsFilters(filters)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 8a744c1c198..a6da7308a25 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -91,7 +91,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
     createTable(ident, schema, partitions, properties, 
Distributions.unspecified(),
-      Array.empty, None)
+      Array.empty, None, None)
   }
 
   override def createTable(
@@ -111,6 +111,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
       distribution: Distribution,
       ordering: Array[SortOrder],
       requiredNumPartitions: Option[Int],
+      advisoryPartitionSize: Option[Long],
       distributionStrictlyRequired: Boolean = true,
       numRowsPerSplit: Int = Int.MaxValue): Table = {
     if (tables.containsKey(ident)) {
@@ -121,7 +122,8 @@ class BasicInMemoryTableCatalog extends TableCatalog {
 
     val tableName = s"$name.${ident.quoted}"
     val table = new InMemoryTable(tableName, schema, partitions, properties, 
distribution,
-      ordering, requiredNumPartitions, distributionStrictlyRequired, 
numRowsPerSplit)
+      ordering, requiredNumPartitions, advisoryPartitionSize, 
distributionStrictlyRequired,
+      numRowsPerSplit)
     tables.put(ident, table)
     namespaces.putIfAbsent(ident.namespace.toList, Map())
     table
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 47b3fa2c684..ddf1213cfed 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -893,14 +893,18 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         } else {
           REPARTITION_BY_NUM
         }
-        exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), 
shuffleOrigin) :: Nil
+        exchange.ShuffleExchangeExec(
+          r.partitioning, planLater(r.child),
+          shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
       case r: logical.RebalancePartitions =>
         val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
           REBALANCE_PARTITIONS_BY_NONE
         } else {
           REBALANCE_PARTITIONS_BY_COL
         }
-        exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), 
shuffleOrigin) :: Nil
+        exchange.ShuffleExchangeExec(
+          r.partitioning, planLater(r.child),
+          shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
       case ExternalRDD(outputObjAttr, rdd) => 
ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
       case r: LogicalRDD =>
         RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, 
r.outputOrdering) :: Nil
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
index 1a0836ed752..578e0acd805 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
@@ -30,7 +30,7 @@ object AQEUtils {
     // Project/Filter/LocalSort/CollectMetrics.
     // Note: we only care about `HashPartitioning` as `EnsureRequirements` can 
only optimize out
     // user-specified repartition with `HashPartitioning`.
-    case ShuffleExchangeExec(h: HashPartitioning, _, shuffleOrigin)
+    case ShuffleExchangeExec(h: HashPartitioning, _, shuffleOrigin, _)
         if shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == 
REPARTITION_BY_NUM =>
       val numPartitions = if (shuffleOrigin == REPARTITION_BY_NUM) {
         Some(h.numPartitions)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index dfc7e23c82d..6cca562b6ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -64,16 +64,6 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends AQEShuffleRe
         1
       }
     }
-    val advisoryTargetSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
-    val minPartitionSize = if (Utils.isTesting) {
-      // In the tests, we usually set the target size to a very small value 
that is even smaller
-      // than the default value of the min partition size. Here we also adjust 
the min partition
-      // size to be not larger than 20% of the target size, so that the tests 
don't need to set
-      // both configs all the time to check the coalescing behavior.
-      
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize
 / 5)
-    } else {
-      conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
-    }
 
     // Sub-plans under the Union operator can be coalesced independently, so 
we can divide them
     // into independent "coalesce groups", and all shuffle stages within each 
group have to be
@@ -100,6 +90,17 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends AQEShuffleRe
     val specsMap = mutable.HashMap.empty[Int, Seq[ShufflePartitionSpec]]
     // Coalesce partitions for each coalesce group independently.
     coalesceGroups.zip(minNumPartitionsByGroup).foreach { case (shuffleStages, 
minNumPartitions) =>
+      val advisoryTargetSize = advisoryPartitionSize(shuffleStages)
+      val minPartitionSize = if (Utils.isTesting) {
+        // In the tests, we usually set the target size to a very small value 
that is even smaller
+        // than the default value of the min partition size. Here we also 
adjust the min partition
+        // size to be not larger than 20% of the target size, so that the 
tests don't need to set
+        // both configs all the time to check the coalescing behavior.
+        
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize
 / 5)
+      } else {
+        conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
+      }
+
       val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
         shuffleStages.map(_.shuffleStage.mapStats),
         shuffleStages.map(_.partitionSpecs),
@@ -121,6 +122,15 @@ case class CoalesceShufflePartitions(session: 
SparkSession) extends AQEShuffleRe
     }
   }
 
+  private def advisoryPartitionSize(shuffleStages: Seq[ShuffleStageInfo]): 
Long = {
+    val advisorySizes = 
shuffleStages.flatMap(_.shuffleStage.advisoryPartitionSize).toSet
+    if (advisorySizes.size == 1) {
+      advisorySizes.head
+    } else {
+      conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+    }
+  }
+
   /**
    * Gather all coalesce-able groups such that the shuffle stages in each 
child of a Union operator
    * are in their independent groups if:
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
index b34ab3e380b..abd096b9c7c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
@@ -69,7 +69,8 @@ object OptimizeSkewInRebalancePartitions extends 
AQEShuffleReadRule {
   }
 
   private def tryOptimizeSkewedPartitions(shuffle: ShuffleQueryStageExec): 
SparkPlan = {
-    val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+    val defaultAdvisorySize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+    val advisorySize = 
shuffle.advisoryPartitionSize.getOrElse(defaultAdvisorySize)
     val mapStats = shuffle.mapStats
     if (mapStats.isEmpty ||
       mapStats.get.bytesByPartitionId.forall(_ <= advisorySize)) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 8a2abadd19e..97a4bd617e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -180,6 +180,8 @@ case class ShuffleQueryStageExec(
       throw new IllegalStateException(s"wrong plan for shuffle stage:\n 
${plan.treeString}")
   }
 
+  @transient val advisoryPartitionSize: Option[Long] = 
shuffle.advisoryPartitionSize
+
   @transient private lazy val shuffleFuture = shuffle.submitShuffleJob
 
   override protected def doMaterialize(): Future[Any] = shuffleFuture
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
index b0b0d7bbc2d..9b1155ef698 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
@@ -36,6 +36,7 @@ object DistributionAndOrderingUtils {
       funCatalogOpt: Option[FunctionCatalog]): LogicalPlan = write match {
     case write: RequiresDistributionAndOrdering =>
       val numPartitions = write.requiredNumPartitions()
+      val partitionSize = write.advisoryPartitionSizeInBytes()
 
       val distribution = write.requiredDistribution match {
         case d: OrderedDistribution =>
@@ -49,17 +50,25 @@ object DistributionAndOrderingUtils {
 
       val queryWithDistribution = if (distribution.nonEmpty) {
         val optNumPartitions = if (numPartitions > 0) Some(numPartitions) else 
None
+        val optPartitionSize = if (partitionSize > 0) Some(partitionSize) else 
None
+
+        if (optNumPartitions.isDefined && optPartitionSize.isDefined) {
+          throw 
QueryCompilationErrors.numberAndSizeOfPartitionsNotAllowedTogether()
+        }
+
         // the conversion to catalyst expressions above produces SortOrder 
expressions
         // for OrderedDistribution and generic expressions for 
ClusteredDistribution
         // this allows RebalancePartitions/RepartitionByExpression to pick 
either
         // range or hash partitioning
         if (write.distributionStrictlyRequired()) {
-          RepartitionByExpression(distribution, query, optNumPartitions)
+          RepartitionByExpression(distribution, query, optNumPartitions, 
optPartitionSize)
         } else {
-          RebalancePartitions(distribution, query, optNumPartitions)
+          RebalancePartitions(distribution, query, optNumPartitions, 
optPartitionSize)
         }
       } else if (numPartitions > 0) {
         throw 
QueryCompilationErrors.numberOfPartitionsNotAllowedWithUnspecifiedDistributionError()
+      } else if (partitionSize > 0) {
+        throw 
QueryCompilationErrors.partitionSizeNotAllowedWithUnspecifiedDistributionError()
       } else {
         query
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index bc90a869fd9..457a9e0a868 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -188,7 +188,8 @@ case class EnsureRequirements(
             }
 
             child match {
-              case ShuffleExchangeExec(_, c, so) => 
ShuffleExchangeExec(newPartitioning, c, so)
+              case ShuffleExchangeExec(_, c, so, ps) =>
+                ShuffleExchangeExec(newPartitioning, c, so, ps)
               case _ => ShuffleExchangeExec(newPartitioning, child)
             }
           }
@@ -578,7 +579,7 @@ case class EnsureRequirements(
 
   def apply(plan: SparkPlan): SparkPlan = {
     val newPlan = plan.transformUp {
-      case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin)
+      case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, 
shuffleOrigin, _)
           if optimizeOutRepartition &&
             (shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == 
REPARTITION_BY_NUM) =>
         def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean 
= {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 806a048b244..8d967458ad7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -55,6 +55,11 @@ trait ShuffleExchangeLike extends Exchange {
    */
   def numPartitions: Int
 
+  /**
+   * Returns the advisory partition size.
+   */
+  def advisoryPartitionSize: Option[Long]
+
   /**
    * The origin of this shuffle operator.
    */
@@ -115,7 +120,8 @@ case object REBALANCE_PARTITIONS_BY_COL extends 
ShuffleOrigin
 case class ShuffleExchangeExec(
     override val outputPartitioning: Partitioning,
     child: SparkPlan,
-    shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS)
+    shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS,
+    advisoryPartitionSize: Option[Long] = None)
   extends ShuffleExchangeLike {
 
   private lazy val writeMetrics =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index c7c09bf7c79..eb141eaf3a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -44,7 +44,7 @@ abstract class CTEInlineSuiteBase
       checkAnswer(df, Nil)
 
       val r = df.queryExecution.optimizedPlan.find {
-        case RepartitionByExpression(p, _, None) => p.isEmpty
+        case RepartitionByExpression(p, _, None, _) => p.isEmpty
         case _ => false
       }
       assert(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index eef5a32e3ef..1ee9fd6a6a8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1183,7 +1183,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     def isShuffleExecByRequirement(
         plan: ShuffleExchangeExec,
         desiredClusterColumns: Seq[String]): Boolean = plan match {
-      case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) =>
+      case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, 
_) =>
         partitionExpressionsColumns(op.expressions) === desiredClusterColumns
       case _ => false
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 263e361413c..b6297bc24ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1731,7 +1731,7 @@ class DatasetSuite extends QueryTest
         val agg = cp.groupBy($"id" % 2).agg(count($"id"))
 
         agg.queryExecution.executedPlan.collectFirst {
-          case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
+          case ShuffleExchangeExec(_, _: RDDScanExec, _, _) =>
           case BroadcastExchangeExec(_, _: RDDScanExec) =>
         }.foreach { _ =>
           fail(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 48ad10992c5..1d3efcb86bc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -956,6 +956,7 @@ case class PreRuleReplaceAddWithBrokenVersion() extends 
Rule[SparkPlan] {
 case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends 
ShuffleExchangeLike {
   override def numMappers: Int = delegate.numMappers
   override def numPartitions: Int = delegate.numPartitions
+  override def advisoryPartitionSize: Option[Long] = 
delegate.advisoryPartitionSize
   override def shuffleOrigin: ShuffleOrigin = {
     delegate.shuffleOrigin
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 09be936a0f2..be5e1b524e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -240,7 +240,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
       partitions: Array[Transform],
       catalog: InMemoryTableCatalog = catalog): Unit = {
     catalog.createTable(Identifier.of(Array("ns"), table),
-      schema, partitions, emptyProps, Distributions.unspecified(), 
Array.empty, None,
+      schema, partitions, emptyProps, Distributions.unspecified(), 
Array.empty, None, None,
       numRowsPerSplit = 1)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index f7905daa20a..341b53f032a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -117,8 +117,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkOrderedDistributionAndSortWithSameExprs(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkOrderedDistributionAndSortWithSameExprs(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -127,6 +129,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkOrderedDistributionAndSortWithSameExprs(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -153,6 +156,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeCommand = command,
@@ -213,8 +217,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkClusteredDistributionAndSortWithSameExprs(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkClusteredDistributionAndSortWithSameExprs(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -223,6 +229,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkClusteredDistributionAndSortWithSameExprs(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -258,6 +265,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeCommand = command,
@@ -322,8 +330,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkClusteredDistributionAndSortWithExtendedExprs(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkClusteredDistributionAndSortWithExtendedExprs(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -332,6 +342,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkClusteredDistributionAndSortWithExtendedExprs(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -367,6 +378,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeCommand = command,
@@ -558,8 +570,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkOrderedDistributionAndSortWithManualGlobalSort(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkOrderedDistributionAndSortWithManualGlobalSort(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -568,6 +582,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkOrderedDistributionAndSortWithManualGlobalSort(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -601,6 +616,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeTransform = df => df.orderBy("data", "id"),
@@ -641,8 +657,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkOrderedDistributionAndSortWithIncompatibleGlobalSort(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkOrderedDistributionAndSortWithIncompatibleGlobalSort(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -651,6 +669,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkOrderedDistributionAndSortWithIncompatibleGlobalSort(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -684,6 +703,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeTransform = df => df.orderBy(df("data").desc, df("id").asc),
@@ -722,8 +742,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkOrderedDistributionAndSortWithManualLocalSort(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkOrderedDistributionAndSortWithManualLocalSort(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -732,6 +754,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkOrderedDistributionAndSortWithManualLocalSort(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -765,6 +788,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeTransform = df => df.sortWithinPartitions("data", "id"),
@@ -805,8 +829,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkClusteredDistributionAndLocalSortWithManualGlobalSort(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkClusteredDistributionAndLocalSortWithManualGlobalSort(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -815,6 +841,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkClusteredDistributionAndLocalSortWithManualGlobalSort(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -849,6 +876,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeTransform = df => df.orderBy("data", "id"),
@@ -889,8 +917,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkClusteredDistributionAndLocalSortWithManualLocalSort(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkClusteredDistributionAndLocalSortWithManualLocalSort(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -899,6 +929,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkClusteredDistributionAndLocalSortWithManualLocalSort(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -933,6 +964,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeTransform = df => df.sortWithinPartitions("data", "id"),
@@ -948,7 +980,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     )
     val distribution = Distributions.ordered(ordering)
 
-    catalog.createTable(ident, schema, Array.empty, emptyProps, distribution, 
ordering, None)
+    catalog.createTable(ident, schema, Array.empty, emptyProps, distribution, 
ordering, None, None)
 
     withTempDir { checkpointDir =>
       val inputData = ContinuousMemoryStream[(Long, String)]
@@ -1028,8 +1060,10 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     Seq(true, false).foreach { distributionStrictlyRequired =>
       Seq(true, false).foreach { dataSkewed =>
         Seq(true, false).foreach { coalesce =>
-          checkClusteredDistributionAndLocalSortContainsV2Function(
-            cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+          partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+            checkClusteredDistributionAndLocalSortContainsV2Function(
+              cmd, None, partitionSize, distributionStrictlyRequired, 
dataSkewed, coalesce)
+          }
         }
       }
     }
@@ -1038,6 +1072,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
   private def checkClusteredDistributionAndLocalSortContainsV2Function(
       command: String,
       targetNumPartitions: Option[Int] = None,
+      targetPartitionSize: Option[Long] = None,
       distributionStrictlyRequired: Boolean = true,
       dataSkewed: Boolean = false,
       coalesce: Boolean = false): Unit = {
@@ -1094,6 +1129,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution,
       tableOrdering,
       targetNumPartitions,
+      targetPartitionSize,
       expectedWritePartitioning = writePartitioning,
       expectedWriteOrdering = writeOrdering,
       writeCommand = command,
@@ -1107,6 +1143,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution: Distribution,
       tableOrdering: Array[SortOrder],
       tableNumPartitions: Option[Int],
+      tablePartitionSize: Option[Long] = None,
       expectedWritePartitioning: physical.Partitioning,
       expectedWriteOrdering: Seq[catalyst.expressions.SortOrder],
       writeTransform: DataFrame => DataFrame = df => df,
@@ -1121,6 +1158,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
         tableDistribution,
         tableOrdering,
         tableNumPartitions,
+        tablePartitionSize,
         expectedWritePartitioning,
         expectedWriteOrdering,
         writeTransform,
@@ -1131,6 +1169,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
         tableDistribution,
         tableOrdering,
         tableNumPartitions,
+        tablePartitionSize,
         expectedWritePartitioning,
         expectedWriteOrdering,
         writeTransform,
@@ -1147,6 +1186,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution: Distribution,
       tableOrdering: Array[SortOrder],
       tableNumPartitions: Option[Int],
+      tablePartitionSize: Option[Long],
       expectedWritePartitioning: physical.Partitioning,
       expectedWriteOrdering: Seq[catalyst.expressions.SortOrder],
       writeTransform: DataFrame => DataFrame = df => df,
@@ -1158,7 +1198,8 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     // scalastyle:on argcount
 
     catalog.createTable(ident, schema, Array.empty, emptyProps, 
tableDistribution,
-      tableOrdering, tableNumPartitions, distributionStrictlyRequired)
+      tableOrdering, tableNumPartitions, tablePartitionSize,
+      distributionStrictlyRequired)
 
     val df = if (!dataSkewed) {
       spark.createDataFrame(Seq((1, "a"), (2, "b"), (3, "c"))).toDF("id", 
"data")
@@ -1181,16 +1222,31 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       }
     } else {
       if (coalesce) {
-        val executedPlan = executeCommand()
-        val read = collect(executedPlan) {
-          case r: AQEShuffleReadExec => r
+        // if the partition size is configured for the table, set the SQL conf 
to something small
+        // so that the overriding behavior is tested
+        val defaultAdvisoryPartitionSize = if (tablePartitionSize.isDefined) 
"15" else "32MB"
+        withSQLConf(
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+          SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
+          SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+          SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+          SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> 
defaultAdvisoryPartitionSize,
+          SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
+
+          val executedPlan = executeCommand()
+          val read = collect(executedPlan) {
+            case r: AQEShuffleReadExec => r
+          }
+          assert(read.size == 1)
+          assert(read.head.partitionSpecs.size == 1)
+          checkPartitioningAndOrdering(
+            // num of partition in expectedWritePartitioning is 1
+            executedPlan, expectedWritePartitioning, expectedWriteOrdering, 1)
         }
-        assert(read.size == 1)
-        assert(read.head.partitionSpecs.size == 1)
-        checkPartitioningAndOrdering(
-          // num of partition in expectedWritePartitioning is 1
-          executedPlan, expectedWritePartitioning, expectedWriteOrdering, 1)
       } else {
+        // if the partition size is configured for the table, set the SQL conf 
to something big
+        // so that the overriding behavior is tested
+        val defaultAdvisoryPartitionSize = if (tablePartitionSize.isDefined) 
"64MB" else "100"
         if (dataSkewed && !distributionStrictlyRequired) {
           withSQLConf(
             SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
@@ -1198,7 +1254,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
             
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true",
             SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
             SQLConf.SHUFFLE_PARTITIONS.key -> "5",
-            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> 
defaultAdvisoryPartitionSize,
             SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
             val executedPlan = executeCommand()
             val read = collect(executedPlan) {
@@ -1231,6 +1287,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       tableDistribution: Distribution,
       tableOrdering: Array[SortOrder],
       tableNumPartitions: Option[Int],
+      tablePartitionSize: Option[Long],
       expectedWritePartitioning: physical.Partitioning,
       expectedWriteOrdering: Seq[catalyst.expressions.SortOrder],
       writeTransform: DataFrame => DataFrame = df => df,
@@ -1238,7 +1295,7 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
       expectAnalysisException: Boolean = false): Unit = {
 
     catalog.createTable(ident, schema, Array.empty, emptyProps, 
tableDistribution,
-      tableOrdering, tableNumPartitions)
+      tableOrdering, tableNumPartitions, tablePartitionSize)
 
     withTempDir { checkpointDir =>
       val inputData = MemoryStream[(Long, String)]
@@ -1377,4 +1434,14 @@ class WriteDistributionAndOrderingSuite extends 
DistributionAndOrderingSuiteBase
     HashPartitioning(writePartitioningExprs,
       targetNumPartitions.getOrElse(conf.numShufflePartitions))
   }
+
+  private def partitionSizes(dataSkew: Boolean, coalesce: Boolean): 
Seq[Option[Long]] = {
+    if (coalesce) {
+      Seq(Some(1000L), None)
+    } else if (dataSkew) {
+      Seq(Some(100L), None)
+    } else {
+      Seq(None)
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 4b3d3a4b805..de24b8c82b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -730,10 +730,10 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
     outputPlan match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
              SortExec(_, _,
-               
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
+               
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _, _), 
_),
              SortExec(_, _,
                
ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _),
-               _, _), _), _) =>
+               _, _, _), _), _) =>
         assert(leftKeys === smjExec.leftKeys)
         assert(rightKeys === smjExec.rightKeys)
         assert(leftKeys === leftPartitioningExpressions)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index df1ddb7d9cd..09da1e1e7b0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -55,7 +55,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
     EnsureRequirements.apply(smjExec1) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
         SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _), 
_),
-        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _), _) 
=>
+        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _), 
_) =>
         assert(leftKeys === Seq(exprA, exprB))
         assert(rightKeys === Seq(exprB, exprA))
       case other => fail(other.toString)
@@ -66,7 +66,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprA :: exprB :: Nil, exprB :: exprA :: Nil, Inner, None, plan2, plan1)
     EnsureRequirements.apply(smjExec2) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
         SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _), 
_), _) =>
         assert(leftKeys === Seq(exprB, exprA))
         assert(rightKeys === Seq(exprA, exprB))
@@ -79,7 +79,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprD :: exprC :: Nil, exprB :: exprA :: Nil, Inner, None, plan1, plan1)
     EnsureRequirements.apply(smjExec3) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
         SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _), 
_), _) =>
         assert(leftKeys === Seq(exprC, exprD))
         assert(rightKeys === Seq(exprA, exprB))
@@ -121,8 +121,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
     )
     EnsureRequirements.apply(smjExec2) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-      SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
-      SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _), _) =>
+      SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
+      SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _), _) 
=>
         assert(leftKeys === Seq(exprC, exprB, exprD))
         assert(rightKeys === Seq(exprD, exprA, exprC))
       case other => fail(other.toString)
@@ -140,7 +140,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprA :: exprB :: Nil, exprC :: exprB :: Nil, Inner, None, plan1, plan2)
     EnsureRequirements.apply(smjExec1) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
         SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _), _) 
=>
         assert(leftKeys === Seq(exprB, exprA))
         assert(rightKeys === Seq(exprB, exprC))
@@ -154,7 +154,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprA :: exprB :: Nil, exprC :: exprB :: Nil, Inner, None, plan1, plan3)
     EnsureRequirements.apply(smjExec2) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
         SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _), 
_), _) =>
         assert(leftKeys === Seq(exprB, exprA))
         assert(rightKeys === Seq(exprB, exprC))
@@ -168,7 +168,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
     EnsureRequirements.apply(smjExec3) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
         SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _), 
_),
-        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _), _) 
=>
+        SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _), 
_) =>
         assert(leftKeys === Seq(exprB, exprC))
         assert(rightKeys === Seq(exprB, exprA))
       case other => fail(other.toString)
@@ -314,7 +314,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       applyEnsureRequirementsWithSubsetKeys(smjExec) match {
         case SortMergeJoinExec(leftKeys, rightKeys, _, _,
         SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _), _) 
=>
+        SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _), 
_) =>
           assert(leftKeys === Seq(exprA, exprB))
           assert(rightKeys === Seq(exprC, exprD))
           assert(p.expressions == Seq(exprC))
@@ -330,7 +330,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, 
plan2)
       applyEnsureRequirementsWithSubsetKeys(smjExec) match {
         case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-        SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _),
         SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _), _) 
=>
           assert(leftKeys === Seq(exprA, exprB))
           assert(rightKeys === Seq(exprC, exprD))
@@ -348,7 +348,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       applyEnsureRequirementsWithSubsetKeys(smjExec) match {
         case SortMergeJoinExec(leftKeys, rightKeys, _, _,
         SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _), _) 
=>
+        SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _), 
_) =>
           assert(leftKeys === Seq(exprA, exprB))
           assert(rightKeys === Seq(exprC, exprD))
           assert(p.expressions == Seq(exprC))
@@ -367,7 +367,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
     applyEnsureRequirementsWithSubsetKeys(smjExec) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
       SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
-      SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _), _) =>
+      SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _), _) 
=>
         assert(leftKeys === Seq(exprA, exprB, exprB))
         assert(rightKeys === Seq(exprA, exprC, exprC))
         assert(p.expressions == Seq(exprA, exprC, exprA))
@@ -383,7 +383,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
     applyEnsureRequirementsWithSubsetKeys(smjExec) match {
       case SortMergeJoinExec(leftKeys, rightKeys, _, _,
       SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
-      SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _), _) =>
+      SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _), _) 
=>
         assert(leftKeys === Seq(exprA, exprB, exprB))
         assert(rightKeys === Seq(exprA, exprC, exprD))
         assert(p.expressions == Seq(exprA, exprC, exprA))
@@ -439,8 +439,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, 
plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(left.numPartitions == 5)
           assert(right.numPartitions == 5)
         case other => fail(other.toString)
@@ -457,7 +457,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
         SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(left.numPartitions == 10)
           assert(right.numPartitions == 10)
           assert(right.expressions == Seq(exprC, exprD))
@@ -476,8 +476,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, 
plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(left.numPartitions == 5)
           assert(left.expressions == Seq(exprA, exprB))
           assert(right.numPartitions == 5)
@@ -487,7 +487,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       applyEnsureRequirementsWithSubsetKeys(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
         SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(left.numPartitions == 1)
           assert(right.numPartitions == 1)
           assert(right.expressions == Seq(exprC))
@@ -505,8 +505,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, 
plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(left.numPartitions == conf.numShufflePartitions)
           assert(left.expressions == Seq(exprA, exprB))
           assert(right.numPartitions == conf.numShufflePartitions)
@@ -524,7 +524,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       applyEnsureRequirementsWithSubsetKeys(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
         SortExec(_, _, DummySparkPlan(_, _, left: PartitioningCollection, _, 
_), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(left.numPartitions == 10)
           assert(right.numPartitions == 10)
           assert(right.expressions == Seq(exprA))
@@ -540,7 +540,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         Inner, None, plan1, plan2)
       applyEnsureRequirementsWithSubsetKeys(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
         SortExec(_, _, DummySparkPlan(_, _, right: PartitioningCollection, _, 
_), _), _) =>
           assert(left.numPartitions == 20)
           assert(left.expressions == Seq(exprC))
@@ -582,7 +582,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
         SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(left.expressions === Seq(exprA, exprB))
           assert(right.expressions === Seq(exprC, exprD))
           assert(left.numPartitions == 6)
@@ -601,7 +601,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, 
plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
         SortExec(_, _, DummySparkPlan(_, _, right: HashPartitioning, _, _), 
_), _) =>
           assert(left.expressions === Seq(exprA, exprB))
           assert(right.expressions === Seq(exprC, exprD))
@@ -619,7 +619,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, 
plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
         SortExec(_, _, DummySparkPlan(_, _, right: HashPartitioning, _, _), 
_), _) =>
           assert(left.expressions === Seq(exprA, exprB))
           assert(right.expressions === Seq(exprC, exprD))
@@ -639,7 +639,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, 
plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
         SortExec(_, _, DummySparkPlan(_, _, right: HashPartitioning, _, _), 
_), _) =>
           assert(left.expressions === Seq(exprA, exprB))
           assert(right.expressions === Seq(exprC, exprD))
@@ -661,8 +661,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, 
plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(left.expressions === Seq(exprA, exprB))
           assert(right.expressions === Seq(exprC, exprD))
           assert(left.numPartitions == conf.numShufflePartitions)
@@ -686,7 +686,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
           EnsureRequirements.apply(smjExec) match {
             case SortMergeJoinExec(leftKeys, rightKeys, _, _,
             SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _), 
_),
-            SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), 
_), _) =>
+            SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, 
_), _), _) =>
               assert(leftKeys === Seq(exprA, exprB))
               assert(rightKeys === Seq(exprC, exprD))
               assert(left.numPartitions == 9)
@@ -709,8 +709,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       var smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None, 
plan1, plan2)
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(leftKeys === Seq(exprA))
           assert(rightKeys === Seq(exprC))
           assert(left.numPartitions == 20)
@@ -728,7 +728,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       EnsureRequirements.apply(smjExec) match {
         case SortMergeJoinExec(leftKeys, rightKeys, _, _,
         SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
           assert(leftKeys === Seq(exprA))
           assert(rightKeys === Seq(exprC))
           assert(right.numPartitions == 10)
@@ -760,8 +760,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
         } else {
           EnsureRequirements.apply(smjExec) match {
             case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-            SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), 
_),
-            SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), 
_), _) =>
+            SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, 
_), _),
+            SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, 
_), _), _) =>
               assert(leftKeys === Seq(exprA))
               assert(rightKeys === Seq(exprC))
               assert(left.numPartitions == 5)
@@ -872,8 +872,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprA :: exprB :: exprB :: Nil, exprA :: exprC :: exprC :: Nil, Inner, 
None, plan1, plan2)
     EnsureRequirements.apply(smjExec) match {
       case SortMergeJoinExec(_, _, _, _,
-        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+        SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), 
_),
+        SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
         assert(left.expressions === Seq(exprA, exprB, exprB))
         assert(right.expressions === Seq(exprA, exprC, exprC))
       case other => fail(other.toString)
@@ -954,8 +954,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprA :: exprB :: exprC :: Nil, exprA :: exprB :: exprC :: Nil, Inner, 
None, plan1, plan2)
     applyEnsureRequirementsWithSubsetKeys(smjExec) match {
       case SortMergeJoinExec(_, _, _, _,
-      SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-      SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+      SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), _),
+      SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
         assert(left.expressions === Seq(exprA, exprB, exprC))
         assert(right.expressions === Seq(exprA, exprB, exprC))
       case other => fail(other.toString)
@@ -974,8 +974,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprA :: exprB :: exprB :: Nil, exprA :: exprC :: exprC :: Nil, Inner, 
None, plan1, plan2)
     applyEnsureRequirementsWithSubsetKeys(smjExec) match {
       case SortMergeJoinExec(_, _, _, _,
-      SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-      SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+      SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), _),
+      SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
         assert(left.expressions === Seq(exprA, exprB, exprB))
         assert(right.expressions === Seq(exprA, exprC, exprC))
       case other => fail(other.toString)
@@ -992,8 +992,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprA :: exprB :: exprB :: Nil, exprA :: exprC :: exprC :: Nil, Inner, 
None, plan1, plan2)
     applyEnsureRequirementsWithSubsetKeys(smjExec) match {
       case SortMergeJoinExec(_, _, _, _,
-      SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-      SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+      SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), _),
+      SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
         assert(left.expressions === Seq(exprA, exprB, exprB))
         assert(right.expressions === Seq(exprA, exprC, exprC))
       case other => fail(other.toString)
@@ -1013,8 +1013,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       exprA :: exprB :: exprB :: Nil, exprA :: exprC :: exprC :: Nil, Inner, 
None, plan1, plan2)
     applyEnsureRequirementsWithSubsetKeys(smjExec) match {
       case SortMergeJoinExec(_, _, _, _,
-      SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
-      SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
+      SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), _),
+      SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _), 
_), _) =>
         assert(left.expressions === Seq(exprA, exprB, exprB))
         assert(right.expressions === Seq(exprA, exprC, exprC))
       case other => fail(other.toString)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 40868f896f5..c22e4459660 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -619,8 +619,8 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
 
         assert(query.lastExecution.executedPlan.collect {
           case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _,
-            ShuffleExchangeExec(opA: HashPartitioning, _, _),
-            ShuffleExchangeExec(opB: HashPartitioning, _, _))
+            ShuffleExchangeExec(opA: HashPartitioning, _, _, _),
+            ShuffleExchangeExec(opB: HashPartitioning, _, _, _))
               if partitionExpressionsColumns(opA.expressions) === Seq("a", "b")
                 && partitionExpressionsColumns(opB.expressions) === Seq("a", 
"b")
                 && opA.numPartitions == numPartitions && opB.numPartitions == 
numPartitions => j


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to