This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bd09a3e44f1 [SPARK-42039][SQL] SPJ: Remove Option in
KeyGroupedPartitioning#partitionValuesOpt
bd09a3e44f1 is described below
commit bd09a3e44f11153b7e32a153ce5d0b5d0da4ce0c
Author: Chao Sun <[email protected]>
AuthorDate: Tue Jan 17 11:51:47 2023 -0800
[SPARK-42039][SQL] SPJ: Remove Option in
KeyGroupedPartitioning#partitionValuesOpt
### What changes were proposed in this pull request?
Currently `KeyGroupedPartitioning#partitionValuesOpt` is of type:
`Option[Seq[InternalRow]]`. This refactors it into
`Seq[InternalRow]`.
### Why are the changes needed?
It's unnecessary to use `Option` for the field. Originally I was thinking
to use `None` for the case when all the input partitions are implicitly
matched, so that we can skip comparing them in `EnsureRequirements`. However, I
think it is not really a use case, and we can instead use `Seq.empty` for that
if it comes up.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #39540 from sunchao/SPARK-42039.
Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/plans/physical/partitioning.scala | 16 +++++++---------
.../sql/execution/datasources/v2/BatchScanExec.scala | 6 +++---
.../datasources/v2/DataSourceV2ScanExecBase.scala | 2 +-
.../sql/execution/exchange/EnsureRequirements.scala | 7 ++-----
.../sql/execution/exchange/EnsureRequirementsSuite.scala | 16 ++++++++--------
5 files changed, 21 insertions(+), 26 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index e6eaeda2d0c..73d39a19243 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -324,13 +324,13 @@ case class HashPartitioning(expressions: Seq[Expression],
numPartitions: Int)
*
* @param expressions partition expressions for the partitioning.
* @param numPartitions the number of partitions
- * @param partitionValuesOpt if set, the values for the cluster keys of the
distribution, must be
- * in ascending order.
+ * @param partitionValues the values for the cluster keys of the distribution,
must be
+ * in ascending order.
*/
case class KeyGroupedPartitioning(
expressions: Seq[Expression],
numPartitions: Int,
- partitionValuesOpt: Option[Seq[InternalRow]] = None) extends Partitioning {
+ partitionValues: Seq[InternalRow] = Seq.empty) extends Partitioning {
override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
@@ -360,7 +360,7 @@ object KeyGroupedPartitioning {
def apply(
expressions: Seq[Expression],
partitionValues: Seq[InternalRow]): KeyGroupedPartitioning = {
- KeyGroupedPartitioning(expressions, partitionValues.size,
Some(partitionValues))
+ KeyGroupedPartitioning(expressions, partitionValues.size, partitionValues)
}
def supportsExpressions(expressions: Seq[Expression]): Boolean = {
@@ -692,14 +692,12 @@ case class KeyGroupedShuffleSpec(
// partition keys must share overlapping positions in their
respective clustering keys.
// 3.3 each pair of partition expressions at the same index must share
compatible
// transform functions.
- // 4. the partition values, if present on both sides, are following the
same order.
+ // 4. the partition values from both sides are following the same order.
case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning,
otherDistribution) =>
distribution.clustering.length == otherDistribution.clustering.length &&
numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
-
partitioning.partitionValuesOpt.zip(otherPartitioning.partitionValuesOpt).forall
{
- case (left, right) => left.zip(right).forall { case (l, r) =>
- ordering.compare(l, r) == 0
- }
+
partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall {
+ case (left, right) => ordering.compare(left, right) == 0
}
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 025b1a3c38f..d6b76ae1096 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -84,7 +84,7 @@ case class BatchScanExec(
val newRows = new InternalRowSet(p.expressions.map(_.dataType))
newRows ++=
newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
- val oldRows = p.partitionValuesOpt.get.toSet
+ val oldRows = p.partitionValues.toSet
// We require the new number of partition keys to be equal or less
than the old number
// of partition keys here. In the case of less than, empty
partitions will be added for
// those missing keys that are not present in the new input
partitions.
@@ -116,7 +116,7 @@ case class BatchScanExec(
super.outputPartitioning match {
case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
val values = commonPartitionValues.get
- k.copy(numPartitions = values.length, partitionValuesOpt =
Some(values))
+ k.copy(numPartitions = values.length, partitionValues = values)
case p => p
}
}
@@ -134,7 +134,7 @@ case class BatchScanExec(
case p: KeyGroupedPartitioning =>
val partitionMapping = finalPartitions.map(s =>
s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
- finalPartitions = p.partitionValuesOpt.get.map { partValue =>
+ finalPartitions = p.partitionValues.map { partValue =>
// Use empty partition for those partition values that are not
present
partitionMapping.getOrElse(partValue, Seq.empty)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index fa4ae171df5..556ae4afb63 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -97,7 +97,7 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
keyGroupedPartitioning match {
case Some(exprs) if KeyGroupedPartitioning.supportsExpressions(exprs)
=>
groupedPartitions.map { partitionValues =>
- KeyGroupedPartitioning(exprs, partitionValues.size,
Some(partitionValues.map(_._1)))
+ KeyGroupedPartitioning(exprs, partitionValues.size,
partitionValues.map(_._1))
}.getOrElse(super.outputPartitioning)
case _ =>
super.outputPartitioning
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 7706b26af70..f88436297e7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -198,11 +198,8 @@ case class EnsureRequirements(
// Check if the two children are partition keys compatible. If
so, find the
// common set of partition values, and adjust the plan
accordingly.
if (leftSpec.areKeysCompatible(rightSpec)) {
- assert(leftSpec.partitioning.partitionValuesOpt.isDefined)
- assert(rightSpec.partitioning.partitionValuesOpt.isDefined)
-
- val leftPartValues =
leftSpec.partitioning.partitionValuesOpt.get
- val rightPartValues =
rightSpec.partitioning.partitionValuesOpt.get
+ val leftPartValues = leftSpec.partitioning.partitionValues
+ val rightPartValues = rightSpec.partitioning.partitionValues
val mergedPartValues = Utils.mergeOrdered(
Seq(leftPartValues,
rightPartValues))(leftSpec.ordering).toSeq.distinct
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index 7cfa00b4168..bc1fd7a5fa5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -1024,11 +1024,11 @@ class EnsureRequirementsSuite extends
SharedSparkSession {
var plan1 = DummySparkPlan(
outputPartitioning = KeyGroupedPartitioning(bucket(4, exprB) ::
bucket(8, exprC) :: Nil,
- leftPartValues.length, Some(leftPartValues))
+ leftPartValues.length, leftPartValues)
)
var plan2 = DummySparkPlan(
outputPartitioning = KeyGroupedPartitioning(bucket(4, exprC) ::
bucket(8, exprB) :: Nil,
- rightPartValues.length, Some(rightPartValues))
+ rightPartValues.length, rightPartValues)
)
// simple case
@@ -1047,9 +1047,9 @@ class EnsureRequirementsSuite extends SharedSparkSession {
plan1 = DummySparkPlan(outputPartitioning =
PartitioningCollection(
Seq(KeyGroupedPartitioning(bucket(4, exprB) :: bucket(8, exprC) ::
Nil,
- leftPartValues.length, Some(leftPartValues)),
+ leftPartValues.length, leftPartValues),
KeyGroupedPartitioning(bucket(4, exprB) :: bucket(8, exprC) :: Nil,
- leftPartValues.length, Some(leftPartValues)))
+ leftPartValues.length, leftPartValues))
)
)
@@ -1074,15 +1074,15 @@ class EnsureRequirementsSuite extends
SharedSparkSession {
PartitioningCollection(
Seq(
KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) ::
Nil,
- rightPartValues.length, Some(rightPartValues)),
+ rightPartValues.length, rightPartValues),
KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) ::
Nil,
- rightPartValues.length, Some(rightPartValues)))),
+ rightPartValues.length, rightPartValues))),
PartitioningCollection(
Seq(
KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB)
:: Nil,
- rightPartValues.length, Some(rightPartValues)),
+ rightPartValues.length, rightPartValues),
KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB)
:: Nil,
- rightPartValues.length, Some(rightPartValues))))
+ rightPartValues.length, rightPartValues)))
)
)
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]