This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 59e0066dc78 [SPARK-42779][SQL] Allow V2 writes to indicate advisory
shuffle partition size
59e0066dc78 is described below
commit 59e0066dc78c6300550bde90770e3ba929ea0741
Author: aokolnychyi <[email protected]>
AuthorDate: Fri Mar 17 13:10:22 2023 -0700
[SPARK-42779][SQL] Allow V2 writes to indicate advisory shuffle partition
size
### What changes were proposed in this pull request?
This PR adds an API for data sources to indicate the advisory partition
size for V2 writes.
### Why are the changes needed?
Data sources have an API to request a particular distribution and ordering
of data for V2 writes. If AQE is enabled, the default session advisory
partition size (64MB) will be used as target. Unfortunately, this default value
is still suboptimal and can lead to small files because the written data can be
compressed nicely using columnar file formats. Spark should allow data sources
to indicate the advisory shuffle partition size, just like it lets data sources
request a particular num [...]
### Does this PR introduce _any_ user-facing change?
Yes. However, the changes are backward compatible.
### How was this patch tested?
This PR extends the existing tests for V2 write distribution and ordering.
Closes #40421 from aokolnychyi/spark-42779.
Lead-authored-by: aokolnychyi <[email protected]>
Co-authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/resources/error/error-classes.json | 27 ++++-
.../write/RequiresDistributionAndOrdering.java | 23 +++-
.../spark/sql/catalyst/analysis/Analyzer.scala | 2 +-
.../spark/sql/catalyst/optimizer/Optimizer.scala | 9 +-
.../plans/logical/basicLogicalOperators.scala | 14 ++-
.../spark/sql/errors/QueryCompilationErrors.scala | 14 ++-
.../sql/connector/catalog/InMemoryBaseTable.scala | 5 +
.../sql/connector/catalog/InMemoryTable.scala | 4 +-
.../connector/catalog/InMemoryTableCatalog.scala | 6 +-
.../spark/sql/execution/SparkStrategies.scala | 8 +-
.../spark/sql/execution/adaptive/AQEUtils.scala | 2 +-
.../adaptive/CoalesceShufflePartitions.scala | 30 +++--
.../OptimizeSkewInRebalancePartitions.scala | 3 +-
.../sql/execution/adaptive/QueryStageExec.scala | 2 +
.../v2/DistributionAndOrderingUtils.scala | 13 ++-
.../execution/exchange/EnsureRequirements.scala | 5 +-
.../execution/exchange/ShuffleExchangeExec.scala | 8 +-
.../org/apache/spark/sql/CTEInlineSuite.scala | 2 +-
.../spark/sql/DataFrameWindowFunctionsSuite.scala | 2 +-
.../scala/org/apache/spark/sql/DatasetSuite.scala | 2 +-
.../spark/sql/SparkSessionExtensionSuite.scala | 1 +
.../connector/KeyGroupedPartitioningSuite.scala | 2 +-
.../WriteDistributionAndOrderingSuite.scala | 127 ++++++++++++++++-----
.../apache/spark/sql/execution/PlannerSuite.scala | 4 +-
.../exchange/EnsureRequirementsSuite.scala | 90 +++++++--------
.../spark/sql/streaming/StreamingJoinSuite.scala | 4 +-
26 files changed, 290 insertions(+), 119 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 34026083bb9..04337478223 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1063,6 +1063,28 @@
],
"sqlState" : "42903"
},
+ "INVALID_WRITE_DISTRIBUTION" : {
+ "message" : [
+ "The requested write distribution is invalid."
+ ],
+ "subClass" : {
+ "PARTITION_NUM_AND_SIZE" : {
+ "message" : [
+ "The partition number and advisory partition size can't be specified
at the same time."
+ ]
+ },
+ "PARTITION_NUM_WITH_UNSPECIFIED_DISTRIBUTION" : {
+ "message" : [
+ "The number of partitions can't be specified with unspecified
distribution."
+ ]
+ },
+ "PARTITION_SIZE_WITH_UNSPECIFIED_DISTRIBUTION" : {
+ "message" : [
+ "The advisory partition size can't be specified with unspecified
distribution."
+ ]
+ }
+ }
+ },
"LOCATION_ALREADY_EXISTS" : {
"message" : [
"Cannot name the managed table as <identifier>, as its associated
location <location> already exists. Please pick a different table name, or
remove the existing location first."
@@ -2931,11 +2953,6 @@
"Unsupported data type <dataType>."
]
},
- "_LEGACY_ERROR_TEMP_1178" : {
- "message" : [
- "The number of partitions can't be specified with unspecified
distribution. Invalid writer requirements detected."
- ]
- },
"_LEGACY_ERROR_TEMP_1181" : {
"message" : [
"Stream-stream join without equality predicate is not supported."
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
index a3d08338c7a..2adfe75f7d8 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
@@ -66,12 +66,33 @@ public interface RequiresDistributionAndOrdering extends
Write {
* <p>
* Note that Spark doesn't support the number of partitions on {@link
UnspecifiedDistribution},
* the query will fail if the number of partitions are provided but the
distribution is
- * unspecified.
+ * unspecified. Data sources may either request a particular number of
partitions or
+ * a preferred partition size via {@link #advisoryPartitionSizeInBytes}, not
both.
*
* @return the required number of partitions, any value less than 1 mean no
requirement.
*/
default int requiredNumPartitions() { return 0; }
+ /**
+ * Returns the advisory (not guaranteed) shuffle partition size in bytes for
this write.
+ * <p>
+ * Implementations may override this to indicate the preferable partition
size in shuffles
+ * performed to satisfy the requested distribution. Note that Spark doesn't
support setting
+ * the advisory partition size for {@link UnspecifiedDistribution}, the
query will fail if
+ * the advisory partition size is set but the distribution is unspecified.
Data sources may
+ * either request a particular number of partitions via {@link
#requiredNumPartitions()} or
+ * a preferred partition size, not both.
+ * <p>
+ * Data sources should be careful with large advisory sizes as it will
impact the writing
+ * parallelism and may degrade the overall job performance.
+ * <p>
+ * Note this value only acts like a guidance and Spark does not guarantee
the actual and advisory
+ * shuffle partition sizes will match. Ignored if the adaptive execution is
disabled.
+ *
+ * @return the advisory partition size, any value less than 1 means no
preference.
+ */
+ default long advisoryPartitionSizeInBytes() { return 0; }
+
/**
* Returns the ordering required by this write.
* <p>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ddd26c2efe1..d069d639a4a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1743,7 +1743,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
// table `t` even if there is a Project node between the table scan node
and Sort node.
// We also need to propagate the missing attributes from the descendant
node to the current
// node, and project them way at the end via an extra Project.
- case r @ RepartitionByExpression(partitionExprs, child, _)
+ case r @ RepartitionByExpression(partitionExprs, child, _, _)
if !r.resolved || r.missingInput.nonEmpty =>
val resolvedNoOuter =
partitionExprs.map(resolveExpressionByPlanChildren(_, r))
val (newPartitionExprs, newChild) =
resolveExprsAndAddMissingAttrs(resolvedNoOuter, child)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a0d49c29470..c628fa0f46b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1202,15 +1202,16 @@ object CollapseRepartition extends Rule[LogicalPlan] {
}
// Case 2: When a RepartitionByExpression has a child of global Sort,
Repartition or
// RepartitionByExpression we can remove the child.
- case r @ RepartitionByExpression(_, child @ (Sort(_, true, _) | _:
RepartitionOperation), _) =>
+ case r @ RepartitionByExpression(
+ _, child @ (Sort(_, true, _) | _: RepartitionOperation), _, _) =>
r.withNewChildren(child.children)
// Case 3: When a RebalancePartitions has a child of local or global Sort,
Repartition or
// RepartitionByExpression we can remove the child.
- case r @ RebalancePartitions(_, child @ (_: Sort | _:
RepartitionOperation), _) =>
+ case r @ RebalancePartitions(_, child @ (_: Sort | _:
RepartitionOperation), _, _) =>
r.withNewChildren(child.children)
// Case 4: When a RebalancePartitions has a child of RebalancePartitions
we can remove the
// child.
- case r @ RebalancePartitions(_, child: RebalancePartitions, _) =>
+ case r @ RebalancePartitions(_, child: RebalancePartitions, _, _) =>
r.withNewChildren(child.children)
}
}
@@ -1222,7 +1223,7 @@ object CollapseRepartition extends Rule[LogicalPlan] {
object OptimizeRepartition extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan =
plan.transformWithPruning(
_.containsPattern(REPARTITION_OPERATION), ruleId) {
- case r @ RepartitionByExpression(partitionExpressions, _, numPartitions)
+ case r @ RepartitionByExpression(partitionExpressions, _, numPartitions, _)
if partitionExpressions.nonEmpty &&
partitionExpressions.forall(_.foldable) &&
numPartitions.isEmpty =>
r.copy(optNumPartitions = Some(1))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 26be405601a..cdf48dd265f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1790,6 +1790,8 @@ trait HasPartitionExpressions extends SQLConfHelper {
def optNumPartitions: Option[Int]
+ def optAdvisoryPartitionSize: Option[Long]
+
protected def partitioning: Partitioning = if (partitionExpressions.isEmpty)
{
RoundRobinPartitioning(numPartitions)
} else {
@@ -1820,7 +1822,11 @@ trait HasPartitionExpressions extends SQLConfHelper {
case class RepartitionByExpression(
partitionExpressions: Seq[Expression],
child: LogicalPlan,
- optNumPartitions: Option[Int]) extends RepartitionOperation with
HasPartitionExpressions {
+ optNumPartitions: Option[Int],
+ optAdvisoryPartitionSize: Option[Long] = None)
+ extends RepartitionOperation with HasPartitionExpressions {
+
+ require(optNumPartitions.isEmpty || optAdvisoryPartitionSize.isEmpty)
override val partitioning: Partitioning = {
if (numPartitions == 1) {
@@ -1857,7 +1863,11 @@ object RepartitionByExpression {
case class RebalancePartitions(
partitionExpressions: Seq[Expression],
child: LogicalPlan,
- optNumPartitions: Option[Int] = None) extends UnaryNode with
HasPartitionExpressions {
+ optNumPartitions: Option[Int] = None,
+ optAdvisoryPartitionSize: Option[Long] = None) extends UnaryNode with
HasPartitionExpressions {
+
+ require(optNumPartitions.isEmpty || optAdvisoryPartitionSize.isEmpty)
+
override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
override val nodePatterns: Seq[TreePattern] = Seq(REBALANCE_PARTITIONS)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 497a7be1420..f58f9a92928 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1803,7 +1803,19 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase {
def numberOfPartitionsNotAllowedWithUnspecifiedDistributionError():
Throwable = {
new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_1178",
+ errorClass =
"INVALID_WRITE_DISTRIBUTION.PARTITION_NUM_WITH_UNSPECIFIED_DISTRIBUTION",
+ messageParameters = Map.empty)
+ }
+
+ def partitionSizeNotAllowedWithUnspecifiedDistributionError(): Throwable = {
+ new AnalysisException(
+ errorClass =
"INVALID_WRITE_DISTRIBUTION.PARTITION_SIZE_WITH_UNSPECIFIED_DISTRIBUTION",
+ messageParameters = Map.empty)
+ }
+
+ def numberAndSizeOfPartitionsNotAllowedTogether(): Throwable = {
+ new AnalysisException(
+ errorClass = "INVALID_WRITE_DISTRIBUTION.PARTITION_NUM_AND_SIZE",
messageParameters = Map.empty)
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index cd7d80a8296..4c62ca35b1e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -55,6 +55,7 @@ abstract class InMemoryBaseTable(
val distribution: Distribution = Distributions.unspecified(),
val ordering: Array[SortOrder] = Array.empty,
val numPartitions: Option[Int] = None,
+ val advisoryPartitionSize: Option[Long] = None,
val isDistributionStrictlyRequired: Boolean = true,
val numRowsPerSplit: Int = Int.MaxValue)
extends Table with SupportsRead with SupportsWrite with
SupportsMetadataColumns {
@@ -450,6 +451,10 @@ abstract class InMemoryBaseTable(
numPartitions.getOrElse(0)
}
+ override def advisoryPartitionSizeInBytes(): Long = {
+ advisoryPartitionSize.getOrElse(0)
+ }
+
override def toBatch: BatchWrite = writer
override def toStreaming: StreamingWrite = streamingWriter match {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index 318248dae05..d71bf1aeecd 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -39,10 +39,12 @@ class InMemoryTable(
distribution: Distribution = Distributions.unspecified(),
ordering: Array[SortOrder] = Array.empty,
numPartitions: Option[Int] = None,
+ advisoryPartitionSize: Option[Long] = None,
isDistributionStrictlyRequired: Boolean = true,
override val numRowsPerSplit: Int = Int.MaxValue)
extends InMemoryBaseTable(name, schema, partitioning, properties,
distribution,
- ordering, numPartitions, isDistributionStrictlyRequired, numRowsPerSplit)
with SupportsDelete {
+ ordering, numPartitions, advisoryPartitionSize,
isDistributionStrictlyRequired,
+ numRowsPerSplit) with SupportsDelete {
override def canDeleteWhere(filters: Array[Filter]): Boolean = {
InMemoryTable.supportsFilters(filters)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 8a744c1c198..a6da7308a25 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -91,7 +91,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
createTable(ident, schema, partitions, properties,
Distributions.unspecified(),
- Array.empty, None)
+ Array.empty, None, None)
}
override def createTable(
@@ -111,6 +111,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
distribution: Distribution,
ordering: Array[SortOrder],
requiredNumPartitions: Option[Int],
+ advisoryPartitionSize: Option[Long],
distributionStrictlyRequired: Boolean = true,
numRowsPerSplit: Int = Int.MaxValue): Table = {
if (tables.containsKey(ident)) {
@@ -121,7 +122,8 @@ class BasicInMemoryTableCatalog extends TableCatalog {
val tableName = s"$name.${ident.quoted}"
val table = new InMemoryTable(tableName, schema, partitions, properties,
distribution,
- ordering, requiredNumPartitions, distributionStrictlyRequired,
numRowsPerSplit)
+ ordering, requiredNumPartitions, advisoryPartitionSize,
distributionStrictlyRequired,
+ numRowsPerSplit)
tables.put(ident, table)
namespaces.putIfAbsent(ident.namespace.toList, Map())
table
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 47b3fa2c684..ddf1213cfed 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -893,14 +893,18 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
} else {
REPARTITION_BY_NUM
}
- exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child),
shuffleOrigin) :: Nil
+ exchange.ShuffleExchangeExec(
+ r.partitioning, planLater(r.child),
+ shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
- exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child),
shuffleOrigin) :: Nil
+ exchange.ShuffleExchangeExec(
+ r.partitioning, planLater(r.child),
+ shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
case ExternalRDD(outputObjAttr, rdd) =>
ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning,
r.outputOrdering) :: Nil
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
index 1a0836ed752..578e0acd805 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
@@ -30,7 +30,7 @@ object AQEUtils {
// Project/Filter/LocalSort/CollectMetrics.
// Note: we only care about `HashPartitioning` as `EnsureRequirements` can
only optimize out
// user-specified repartition with `HashPartitioning`.
- case ShuffleExchangeExec(h: HashPartitioning, _, shuffleOrigin)
+ case ShuffleExchangeExec(h: HashPartitioning, _, shuffleOrigin, _)
if shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin ==
REPARTITION_BY_NUM =>
val numPartitions = if (shuffleOrigin == REPARTITION_BY_NUM) {
Some(h.numPartitions)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index dfc7e23c82d..6cca562b6ab 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -64,16 +64,6 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends AQEShuffleRe
1
}
}
- val advisoryTargetSize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
- val minPartitionSize = if (Utils.isTesting) {
- // In the tests, we usually set the target size to a very small value
that is even smaller
- // than the default value of the min partition size. Here we also adjust
the min partition
- // size to be not larger than 20% of the target size, so that the tests
don't need to set
- // both configs all the time to check the coalescing behavior.
-
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize
/ 5)
- } else {
- conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
- }
// Sub-plans under the Union operator can be coalesced independently, so
we can divide them
// into independent "coalesce groups", and all shuffle stages within each
group have to be
@@ -100,6 +90,17 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends AQEShuffleRe
val specsMap = mutable.HashMap.empty[Int, Seq[ShufflePartitionSpec]]
// Coalesce partitions for each coalesce group independently.
coalesceGroups.zip(minNumPartitionsByGroup).foreach { case (shuffleStages,
minNumPartitions) =>
+ val advisoryTargetSize = advisoryPartitionSize(shuffleStages)
+ val minPartitionSize = if (Utils.isTesting) {
+ // In the tests, we usually set the target size to a very small value
that is even smaller
+ // than the default value of the min partition size. Here we also
adjust the min partition
+ // size to be not larger than 20% of the target size, so that the
tests don't need to set
+ // both configs all the time to check the coalescing behavior.
+
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize
/ 5)
+ } else {
+ conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
+ }
+
val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
shuffleStages.map(_.shuffleStage.mapStats),
shuffleStages.map(_.partitionSpecs),
@@ -121,6 +122,15 @@ case class CoalesceShufflePartitions(session:
SparkSession) extends AQEShuffleRe
}
}
+ private def advisoryPartitionSize(shuffleStages: Seq[ShuffleStageInfo]):
Long = {
+ val advisorySizes =
shuffleStages.flatMap(_.shuffleStage.advisoryPartitionSize).toSet
+ if (advisorySizes.size == 1) {
+ advisorySizes.head
+ } else {
+ conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+ }
+ }
+
/**
* Gather all coalesce-able groups such that the shuffle stages in each
child of a Union operator
* are in their independent groups if:
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
index b34ab3e380b..abd096b9c7c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
@@ -69,7 +69,8 @@ object OptimizeSkewInRebalancePartitions extends
AQEShuffleReadRule {
}
private def tryOptimizeSkewedPartitions(shuffle: ShuffleQueryStageExec):
SparkPlan = {
- val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+ val defaultAdvisorySize =
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+ val advisorySize =
shuffle.advisoryPartitionSize.getOrElse(defaultAdvisorySize)
val mapStats = shuffle.mapStats
if (mapStats.isEmpty ||
mapStats.get.bytesByPartitionId.forall(_ <= advisorySize)) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 8a2abadd19e..97a4bd617e9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -180,6 +180,8 @@ case class ShuffleQueryStageExec(
throw new IllegalStateException(s"wrong plan for shuffle stage:\n
${plan.treeString}")
}
+ @transient val advisoryPartitionSize: Option[Long] =
shuffle.advisoryPartitionSize
+
@transient private lazy val shuffleFuture = shuffle.submitShuffleJob
override protected def doMaterialize(): Future[Any] = shuffleFuture
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
index b0b0d7bbc2d..9b1155ef698 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
@@ -36,6 +36,7 @@ object DistributionAndOrderingUtils {
funCatalogOpt: Option[FunctionCatalog]): LogicalPlan = write match {
case write: RequiresDistributionAndOrdering =>
val numPartitions = write.requiredNumPartitions()
+ val partitionSize = write.advisoryPartitionSizeInBytes()
val distribution = write.requiredDistribution match {
case d: OrderedDistribution =>
@@ -49,17 +50,25 @@ object DistributionAndOrderingUtils {
val queryWithDistribution = if (distribution.nonEmpty) {
val optNumPartitions = if (numPartitions > 0) Some(numPartitions) else
None
+ val optPartitionSize = if (partitionSize > 0) Some(partitionSize) else
None
+
+ if (optNumPartitions.isDefined && optPartitionSize.isDefined) {
+ throw
QueryCompilationErrors.numberAndSizeOfPartitionsNotAllowedTogether()
+ }
+
// the conversion to catalyst expressions above produces SortOrder
expressions
// for OrderedDistribution and generic expressions for
ClusteredDistribution
// this allows RebalancePartitions/RepartitionByExpression to pick
either
// range or hash partitioning
if (write.distributionStrictlyRequired()) {
- RepartitionByExpression(distribution, query, optNumPartitions)
+ RepartitionByExpression(distribution, query, optNumPartitions,
optPartitionSize)
} else {
- RebalancePartitions(distribution, query, optNumPartitions)
+ RebalancePartitions(distribution, query, optNumPartitions,
optPartitionSize)
}
} else if (numPartitions > 0) {
throw
QueryCompilationErrors.numberOfPartitionsNotAllowedWithUnspecifiedDistributionError()
+ } else if (partitionSize > 0) {
+ throw
QueryCompilationErrors.partitionSizeNotAllowedWithUnspecifiedDistributionError()
} else {
query
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index bc90a869fd9..457a9e0a868 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -188,7 +188,8 @@ case class EnsureRequirements(
}
child match {
- case ShuffleExchangeExec(_, c, so) =>
ShuffleExchangeExec(newPartitioning, c, so)
+ case ShuffleExchangeExec(_, c, so, ps) =>
+ ShuffleExchangeExec(newPartitioning, c, so, ps)
case _ => ShuffleExchangeExec(newPartitioning, child)
}
}
@@ -578,7 +579,7 @@ case class EnsureRequirements(
def apply(plan: SparkPlan): SparkPlan = {
val newPlan = plan.transformUp {
- case operator @ ShuffleExchangeExec(upper: HashPartitioning, child,
shuffleOrigin)
+ case operator @ ShuffleExchangeExec(upper: HashPartitioning, child,
shuffleOrigin, _)
if optimizeOutRepartition &&
(shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin ==
REPARTITION_BY_NUM) =>
def hasSemanticEqualPartitioning(partitioning: Partitioning): Boolean
= {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 806a048b244..8d967458ad7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -55,6 +55,11 @@ trait ShuffleExchangeLike extends Exchange {
*/
def numPartitions: Int
+ /**
+ * Returns the advisory partition size.
+ */
+ def advisoryPartitionSize: Option[Long]
+
/**
* The origin of this shuffle operator.
*/
@@ -115,7 +120,8 @@ case object REBALANCE_PARTITIONS_BY_COL extends
ShuffleOrigin
case class ShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
- shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS)
+ shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS,
+ advisoryPartitionSize: Option[Long] = None)
extends ShuffleExchangeLike {
private lazy val writeMetrics =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index c7c09bf7c79..eb141eaf3a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -44,7 +44,7 @@ abstract class CTEInlineSuiteBase
checkAnswer(df, Nil)
val r = df.queryExecution.optimizedPlan.find {
- case RepartitionByExpression(p, _, None) => p.isEmpty
+ case RepartitionByExpression(p, _, None, _) => p.isEmpty
case _ => false
}
assert(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index eef5a32e3ef..1ee9fd6a6a8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1183,7 +1183,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
def isShuffleExecByRequirement(
plan: ShuffleExchangeExec,
desiredClusterColumns: Seq[String]): Boolean = plan match {
- case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) =>
+ case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS,
_) =>
partitionExpressionsColumns(op.expressions) === desiredClusterColumns
case _ => false
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 263e361413c..b6297bc24ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1731,7 +1731,7 @@ class DatasetSuite extends QueryTest
val agg = cp.groupBy($"id" % 2).agg(count($"id"))
agg.queryExecution.executedPlan.collectFirst {
- case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
+ case ShuffleExchangeExec(_, _: RDDScanExec, _, _) =>
case BroadcastExchangeExec(_, _: RDDScanExec) =>
}.foreach { _ =>
fail(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 48ad10992c5..1d3efcb86bc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -956,6 +956,7 @@ case class PreRuleReplaceAddWithBrokenVersion() extends
Rule[SparkPlan] {
case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends
ShuffleExchangeLike {
override def numMappers: Int = delegate.numMappers
override def numPartitions: Int = delegate.numPartitions
+ override def advisoryPartitionSize: Option[Long] =
delegate.advisoryPartitionSize
override def shuffleOrigin: ShuffleOrigin = {
delegate.shuffleOrigin
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 09be936a0f2..be5e1b524e5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -240,7 +240,7 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
partitions: Array[Transform],
catalog: InMemoryTableCatalog = catalog): Unit = {
catalog.createTable(Identifier.of(Array("ns"), table),
- schema, partitions, emptyProps, Distributions.unspecified(),
Array.empty, None,
+ schema, partitions, emptyProps, Distributions.unspecified(),
Array.empty, None, None,
numRowsPerSplit = 1)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index f7905daa20a..341b53f032a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -117,8 +117,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkOrderedDistributionAndSortWithSameExprs(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkOrderedDistributionAndSortWithSameExprs(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -127,6 +129,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkOrderedDistributionAndSortWithSameExprs(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -153,6 +156,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeCommand = command,
@@ -213,8 +217,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkClusteredDistributionAndSortWithSameExprs(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkClusteredDistributionAndSortWithSameExprs(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -223,6 +229,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkClusteredDistributionAndSortWithSameExprs(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -258,6 +265,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeCommand = command,
@@ -322,8 +330,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkClusteredDistributionAndSortWithExtendedExprs(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkClusteredDistributionAndSortWithExtendedExprs(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -332,6 +342,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkClusteredDistributionAndSortWithExtendedExprs(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -367,6 +378,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeCommand = command,
@@ -558,8 +570,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkOrderedDistributionAndSortWithManualGlobalSort(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkOrderedDistributionAndSortWithManualGlobalSort(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -568,6 +582,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkOrderedDistributionAndSortWithManualGlobalSort(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -601,6 +616,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeTransform = df => df.orderBy("data", "id"),
@@ -641,8 +657,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkOrderedDistributionAndSortWithIncompatibleGlobalSort(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkOrderedDistributionAndSortWithIncompatibleGlobalSort(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -651,6 +669,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkOrderedDistributionAndSortWithIncompatibleGlobalSort(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -684,6 +703,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeTransform = df => df.orderBy(df("data").desc, df("id").asc),
@@ -722,8 +742,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkOrderedDistributionAndSortWithManualLocalSort(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkOrderedDistributionAndSortWithManualLocalSort(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -732,6 +754,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkOrderedDistributionAndSortWithManualLocalSort(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -765,6 +788,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeTransform = df => df.sortWithinPartitions("data", "id"),
@@ -805,8 +829,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkClusteredDistributionAndLocalSortWithManualGlobalSort(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkClusteredDistributionAndLocalSortWithManualGlobalSort(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -815,6 +841,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkClusteredDistributionAndLocalSortWithManualGlobalSort(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -849,6 +876,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeTransform = df => df.orderBy("data", "id"),
@@ -889,8 +917,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkClusteredDistributionAndLocalSortWithManualLocalSort(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkClusteredDistributionAndLocalSortWithManualLocalSort(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -899,6 +929,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkClusteredDistributionAndLocalSortWithManualLocalSort(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -933,6 +964,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeTransform = df => df.sortWithinPartitions("data", "id"),
@@ -948,7 +980,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
)
val distribution = Distributions.ordered(ordering)
- catalog.createTable(ident, schema, Array.empty, emptyProps, distribution,
ordering, None)
+ catalog.createTable(ident, schema, Array.empty, emptyProps, distribution,
ordering, None, None)
withTempDir { checkpointDir =>
val inputData = ContinuousMemoryStream[(Long, String)]
@@ -1028,8 +1060,10 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
Seq(true, false).foreach { distributionStrictlyRequired =>
Seq(true, false).foreach { dataSkewed =>
Seq(true, false).foreach { coalesce =>
- checkClusteredDistributionAndLocalSortContainsV2Function(
- cmd, None, distributionStrictlyRequired, dataSkewed, coalesce)
+ partitionSizes(dataSkewed, coalesce).foreach { partitionSize =>
+ checkClusteredDistributionAndLocalSortContainsV2Function(
+ cmd, None, partitionSize, distributionStrictlyRequired,
dataSkewed, coalesce)
+ }
}
}
}
@@ -1038,6 +1072,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
private def checkClusteredDistributionAndLocalSortContainsV2Function(
command: String,
targetNumPartitions: Option[Int] = None,
+ targetPartitionSize: Option[Long] = None,
distributionStrictlyRequired: Boolean = true,
dataSkewed: Boolean = false,
coalesce: Boolean = false): Unit = {
@@ -1094,6 +1129,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
targetNumPartitions,
+ targetPartitionSize,
expectedWritePartitioning = writePartitioning,
expectedWriteOrdering = writeOrdering,
writeCommand = command,
@@ -1107,6 +1143,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution: Distribution,
tableOrdering: Array[SortOrder],
tableNumPartitions: Option[Int],
+ tablePartitionSize: Option[Long] = None,
expectedWritePartitioning: physical.Partitioning,
expectedWriteOrdering: Seq[catalyst.expressions.SortOrder],
writeTransform: DataFrame => DataFrame = df => df,
@@ -1121,6 +1158,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
tableNumPartitions,
+ tablePartitionSize,
expectedWritePartitioning,
expectedWriteOrdering,
writeTransform,
@@ -1131,6 +1169,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution,
tableOrdering,
tableNumPartitions,
+ tablePartitionSize,
expectedWritePartitioning,
expectedWriteOrdering,
writeTransform,
@@ -1147,6 +1186,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution: Distribution,
tableOrdering: Array[SortOrder],
tableNumPartitions: Option[Int],
+ tablePartitionSize: Option[Long],
expectedWritePartitioning: physical.Partitioning,
expectedWriteOrdering: Seq[catalyst.expressions.SortOrder],
writeTransform: DataFrame => DataFrame = df => df,
@@ -1158,7 +1198,8 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
// scalastyle:on argcount
catalog.createTable(ident, schema, Array.empty, emptyProps,
tableDistribution,
- tableOrdering, tableNumPartitions, distributionStrictlyRequired)
+ tableOrdering, tableNumPartitions, tablePartitionSize,
+ distributionStrictlyRequired)
val df = if (!dataSkewed) {
spark.createDataFrame(Seq((1, "a"), (2, "b"), (3, "c"))).toDF("id",
"data")
@@ -1181,16 +1222,31 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
}
} else {
if (coalesce) {
- val executedPlan = executeCommand()
- val read = collect(executedPlan) {
- case r: AQEShuffleReadExec => r
+ // if the partition size is configured for the table, set the SQL conf
to something small
+ // so that the overriding behavior is tested
+ val defaultAdvisoryPartitionSize = if (tablePartitionSize.isDefined)
"15" else "32MB"
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key ->
defaultAdvisoryPartitionSize,
+ SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
+
+ val executedPlan = executeCommand()
+ val read = collect(executedPlan) {
+ case r: AQEShuffleReadExec => r
+ }
+ assert(read.size == 1)
+ assert(read.head.partitionSpecs.size == 1)
+ checkPartitioningAndOrdering(
+ // num of partition in expectedWritePartitioning is 1
+ executedPlan, expectedWritePartitioning, expectedWriteOrdering, 1)
}
- assert(read.size == 1)
- assert(read.head.partitionSpecs.size == 1)
- checkPartitioningAndOrdering(
- // num of partition in expectedWritePartitioning is 1
- executedPlan, expectedWritePartitioning, expectedWriteOrdering, 1)
} else {
+ // if the partition size is configured for the table, set the SQL conf
to something big
+ // so that the overriding behavior is tested
+ val defaultAdvisoryPartitionSize = if (tablePartitionSize.isDefined)
"64MB" else "100"
if (dataSkewed && !distributionStrictlyRequired) {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
@@ -1198,7 +1254,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
- SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key ->
defaultAdvisoryPartitionSize,
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
val executedPlan = executeCommand()
val read = collect(executedPlan) {
@@ -1231,6 +1287,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
tableDistribution: Distribution,
tableOrdering: Array[SortOrder],
tableNumPartitions: Option[Int],
+ tablePartitionSize: Option[Long],
expectedWritePartitioning: physical.Partitioning,
expectedWriteOrdering: Seq[catalyst.expressions.SortOrder],
writeTransform: DataFrame => DataFrame = df => df,
@@ -1238,7 +1295,7 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
expectAnalysisException: Boolean = false): Unit = {
catalog.createTable(ident, schema, Array.empty, emptyProps,
tableDistribution,
- tableOrdering, tableNumPartitions)
+ tableOrdering, tableNumPartitions, tablePartitionSize)
withTempDir { checkpointDir =>
val inputData = MemoryStream[(Long, String)]
@@ -1377,4 +1434,14 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
HashPartitioning(writePartitioningExprs,
targetNumPartitions.getOrElse(conf.numShufflePartitions))
}
+
+ private def partitionSizes(dataSkew: Boolean, coalesce: Boolean):
Seq[Option[Long]] = {
+ if (coalesce) {
+ Seq(Some(1000L), None)
+ } else if (dataSkew) {
+ Seq(Some(100L), None)
+ } else {
+ Seq(None)
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 4b3d3a4b805..de24b8c82b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -730,10 +730,10 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
outputPlan match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _,
-
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _), _),
+
ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _, _, _),
_),
SortExec(_, _,
ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _),
- _, _), _), _) =>
+ _, _, _), _), _) =>
assert(leftKeys === smjExec.leftKeys)
assert(rightKeys === smjExec.rightKeys)
assert(leftKeys === leftPartitioningExpressions)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index df1ddb7d9cd..09da1e1e7b0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -55,7 +55,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
EnsureRequirements.apply(smjExec1) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _),
_),
- SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _), _)
=>
+ SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
_) =>
assert(leftKeys === Seq(exprA, exprB))
assert(rightKeys === Seq(exprB, exprA))
case other => fail(other.toString)
@@ -66,7 +66,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprB :: exprA :: Nil, Inner, None, plan2, plan1)
EnsureRequirements.apply(smjExec2) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _),
_), _) =>
assert(leftKeys === Seq(exprB, exprA))
assert(rightKeys === Seq(exprA, exprB))
@@ -79,7 +79,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprD :: exprC :: Nil, exprB :: exprA :: Nil, Inner, None, plan1, plan1)
EnsureRequirements.apply(smjExec3) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _),
_), _) =>
assert(leftKeys === Seq(exprC, exprD))
assert(rightKeys === Seq(exprA, exprB))
@@ -121,8 +121,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
)
EnsureRequirements.apply(smjExec2) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _), _) =>
+ SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _), _)
=>
assert(leftKeys === Seq(exprC, exprB, exprD))
assert(rightKeys === Seq(exprD, exprA, exprC))
case other => fail(other.toString)
@@ -140,7 +140,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprB :: Nil, Inner, None, plan1, plan2)
EnsureRequirements.apply(smjExec1) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _), _)
=>
assert(leftKeys === Seq(exprB, exprA))
assert(rightKeys === Seq(exprB, exprC))
@@ -154,7 +154,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprB :: Nil, Inner, None, plan1, plan3)
EnsureRequirements.apply(smjExec2) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _),
_), _) =>
assert(leftKeys === Seq(exprB, exprA))
assert(rightKeys === Seq(exprB, exprC))
@@ -168,7 +168,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
EnsureRequirements.apply(smjExec3) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _),
_),
- SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _), _)
=>
+ SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _),
_) =>
assert(leftKeys === Seq(exprB, exprC))
assert(rightKeys === Seq(exprB, exprA))
case other => fail(other.toString)
@@ -314,7 +314,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _), _)
=>
+ SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _),
_) =>
assert(leftKeys === Seq(exprA, exprB))
assert(rightKeys === Seq(exprC, exprD))
assert(p.expressions == Seq(exprC))
@@ -330,7 +330,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1,
plan2)
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _),
SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _), _)
=>
assert(leftKeys === Seq(exprA, exprB))
assert(rightKeys === Seq(exprC, exprD))
@@ -348,7 +348,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _), _)
=>
+ SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _),
_) =>
assert(leftKeys === Seq(exprA, exprB))
assert(rightKeys === Seq(exprC, exprD))
assert(p.expressions == Seq(exprC))
@@ -367,7 +367,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _), _) =>
+ SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _), _)
=>
assert(leftKeys === Seq(exprA, exprB, exprB))
assert(rightKeys === Seq(exprA, exprC, exprC))
assert(p.expressions == Seq(exprA, exprC, exprA))
@@ -383,7 +383,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _), _), _) =>
+ SortExec(_, _, ShuffleExchangeExec(p: HashPartitioning, _, _, _), _), _)
=>
assert(leftKeys === Seq(exprA, exprB, exprB))
assert(rightKeys === Seq(exprA, exprC, exprD))
assert(p.expressions == Seq(exprA, exprC, exprA))
@@ -439,8 +439,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1,
plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.numPartitions == 5)
assert(right.numPartitions == 5)
case other => fail(other.toString)
@@ -457,7 +457,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.numPartitions == 10)
assert(right.numPartitions == 10)
assert(right.expressions == Seq(exprC, exprD))
@@ -476,8 +476,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1,
plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.numPartitions == 5)
assert(left.expressions == Seq(exprA, exprB))
assert(right.numPartitions == 5)
@@ -487,7 +487,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.numPartitions == 1)
assert(right.numPartitions == 1)
assert(right.expressions == Seq(exprC))
@@ -505,8 +505,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1,
plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.numPartitions == conf.numShufflePartitions)
assert(left.expressions == Seq(exprA, exprB))
assert(right.numPartitions == conf.numShufflePartitions)
@@ -524,7 +524,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _, DummySparkPlan(_, _, left: PartitioningCollection, _,
_), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.numPartitions == 10)
assert(right.numPartitions == 10)
assert(right.expressions == Seq(exprA))
@@ -540,7 +540,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
Inner, None, plan1, plan2)
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
SortExec(_, _, DummySparkPlan(_, _, right: PartitioningCollection, _,
_), _), _) =>
assert(left.numPartitions == 20)
assert(left.expressions == Seq(exprC))
@@ -582,7 +582,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB))
assert(right.expressions === Seq(exprC, exprD))
assert(left.numPartitions == 6)
@@ -601,7 +601,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1,
plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
SortExec(_, _, DummySparkPlan(_, _, right: HashPartitioning, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB))
assert(right.expressions === Seq(exprC, exprD))
@@ -619,7 +619,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1,
plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
SortExec(_, _, DummySparkPlan(_, _, right: HashPartitioning, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB))
assert(right.expressions === Seq(exprC, exprD))
@@ -639,7 +639,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1,
plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
SortExec(_, _, DummySparkPlan(_, _, right: HashPartitioning, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB))
assert(right.expressions === Seq(exprC, exprD))
@@ -661,8 +661,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1,
plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB))
assert(right.expressions === Seq(exprC, exprD))
assert(left.numPartitions == conf.numShufflePartitions)
@@ -686,7 +686,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _),
_),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _),
_), _) =>
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _,
_), _), _) =>
assert(leftKeys === Seq(exprA, exprB))
assert(rightKeys === Seq(exprC, exprD))
assert(left.numPartitions == 9)
@@ -709,8 +709,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
var smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None,
plan1, plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(leftKeys === Seq(exprA))
assert(rightKeys === Seq(exprC))
assert(left.numPartitions == 20)
@@ -728,7 +728,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(leftKeys === Seq(exprA))
assert(rightKeys === Seq(exprC))
assert(right.numPartitions == 10)
@@ -760,8 +760,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
} else {
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _),
_),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _),
_), _) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _,
_), _),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _,
_), _), _) =>
assert(leftKeys === Seq(exprA))
assert(rightKeys === Seq(exprC))
assert(left.numPartitions == 5)
@@ -872,8 +872,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: exprB :: Nil, exprA :: exprC :: exprC :: Nil, Inner,
None, plan1, plan2)
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _),
_),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB, exprB))
assert(right.expressions === Seq(exprA, exprC, exprC))
case other => fail(other.toString)
@@ -954,8 +954,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: exprC :: Nil, exprA :: exprB :: exprC :: Nil, Inner,
None, plan1, plan2)
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB, exprC))
assert(right.expressions === Seq(exprA, exprB, exprC))
case other => fail(other.toString)
@@ -974,8 +974,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: exprB :: Nil, exprA :: exprC :: exprC :: Nil, Inner,
None, plan1, plan2)
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB, exprB))
assert(right.expressions === Seq(exprA, exprC, exprC))
case other => fail(other.toString)
@@ -992,8 +992,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: exprB :: Nil, exprA :: exprC :: exprC :: Nil, Inner,
None, plan1, plan2)
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB, exprB))
assert(right.expressions === Seq(exprA, exprC, exprC))
case other => fail(other.toString)
@@ -1013,8 +1013,8 @@ class EnsureRequirementsSuite extends SharedSparkSession {
exprA :: exprB :: exprB :: Nil, exprA :: exprC :: exprC :: Nil, Inner,
None, plan1, plan2)
applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
- SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
- SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
+ SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _, _), _),
+ SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _, _),
_), _) =>
assert(left.expressions === Seq(exprA, exprB, exprB))
assert(right.expressions === Seq(exprA, exprC, exprC))
case other => fail(other.toString)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 40868f896f5..c22e4459660 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -619,8 +619,8 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
assert(query.lastExecution.executedPlan.collect {
case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _,
- ShuffleExchangeExec(opA: HashPartitioning, _, _),
- ShuffleExchangeExec(opB: HashPartitioning, _, _))
+ ShuffleExchangeExec(opA: HashPartitioning, _, _, _),
+ ShuffleExchangeExec(opB: HashPartitioning, _, _, _))
if partitionExpressionsColumns(opA.expressions) === Seq("a", "b")
&& partitionExpressionsColumns(opB.expressions) === Seq("a",
"b")
&& opA.numPartitions == numPartitions && opB.numPartitions ==
numPartitions => j
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]