Repository: spark Updated Branches: refs/heads/master fe1c895e1 -> 101556d0f
[SPARK-19060][SQL] remove the supportsPartial flag in AggregateFunction ## What changes were proposed in this pull request? Now all aggregation functions support partial aggregate, we can remove the `supportsPartual` flag in `AggregateFunction` ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #16461 from cloud-fan/partial. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/101556d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/101556d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/101556d0 Branch: refs/heads/master Commit: 101556d0fa704deca0f4a2e5070906d4af2c861b Parents: fe1c895 Author: Wenchen Fan <[email protected]> Authored: Wed Jan 4 12:46:30 2017 +0100 Committer: Herman van Hovell <[email protected]> Committed: Wed Jan 4 12:46:30 2017 +0100 ---------------------------------------------------------------------- .../expressions/aggregate/interfaces.scala | 6 ------ .../expressions/windowExpressions.scala | 1 - .../optimizer/RewriteDistinctAggregates.scala | 7 ++----- .../spark/sql/execution/SparkStrategies.scala | 13 +------------ .../sql/execution/aggregate/AggUtils.scala | 20 -------------------- .../org/apache/spark/sql/hive/hiveUDFs.scala | 2 -- .../sql/hive/execution/TestingTypedCount.scala | 2 -- 7 files changed, 3 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index ccd4ae6..80c25d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -174,12 +174,6 @@ abstract class AggregateFunction extends Expression { def inputAggBufferAttributes: Seq[AttributeReference] /** - * Indicates if this function supports partial aggregation. - * Currently Hive UDAF is the only one that doesn't support partial aggregation. - */ - def supportsPartial: Boolean = true - - /** * Result of the aggregate function when the input is empty. This is currently only used for the * proper rewriting of distinct aggregate functions. */ http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index c0d6a6b..13115f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -436,7 +436,6 @@ abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowF override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow) override def dataType: DataType = IntegerType override def nullable: Boolean = true - override def supportsPartial: Boolean = false override lazy val mergeExpressions = throw new UnsupportedOperationException("Window Functions do not support merging.") } http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index cd8912f..3b27cd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -131,11 +131,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { } } - // Check if the aggregates contains functions that do not support partial aggregation. - val existsNonPartial = aggExpressions.exists(!_.aggregateFunction.supportsPartial) - - // Aggregation strategy can handle queries with a single distinct group and partial aggregates. - if (distinctAggGroups.size > 1 || (distinctAggGroups.size == 1 && existsNonPartial)) { + // Aggregation strategy can handle queries with a single distinct group. + if (distinctAggGroups.size > 1) { // Create the attributes for the grouping id and the group by clause. val gid = AttributeReference("gid", IntegerType, nullable = false)(isGenerated = true) val groupByMap = a.groupingExpressions.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 81cd5ef..28808f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -262,18 +262,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } val aggregateOperator = - if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) { - if (functionsWithDistinct.nonEmpty) { - sys.error("Distinct columns cannot exist in Aggregate operator containing " + - "aggregate functions which don't support partial aggregation.") - } else { - aggregate.AggUtils.planAggregateWithoutPartial( - groupingExpressions, - aggregateExpressions, - resultExpressions, - planLater(child)) - } - } else if (functionsWithDistinct.isEmpty) { + if (functionsWithDistinct.isEmpty) { aggregate.AggUtils.planAggregateWithoutDistinct( groupingExpressions, aggregateExpressions, http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index 8b8ccf4..aa789af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -27,26 +27,6 @@ import org.apache.spark.sql.internal.SQLConf * Utility functions used by the query planner to convert our plan to new aggregation code path. */ object AggUtils { - - def planAggregateWithoutPartial( - groupingExpressions: Seq[NamedExpression], - aggregateExpressions: Seq[AggregateExpression], - resultExpressions: Seq[NamedExpression], - child: SparkPlan): Seq[SparkPlan] = { - - val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) - val completeAggregateAttributes = completeAggregateExpressions.map(_.resultAttribute) - SortAggregateExec( - requiredChildDistributionExpressions = Some(groupingExpressions), - groupingExpressions = groupingExpressions, - aggregateExpressions = completeAggregateExpressions, - aggregateAttributes = completeAggregateAttributes, - initialInputBufferOffset = 0, - resultExpressions = resultExpressions, - child = child - ) :: Nil - } - private def createAggregate( requiredChildDistributionExpressions: Option[Seq[Expression]] = None, groupingExpressions: Seq[NamedExpression] = Nil, http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index fcefd69..4590197 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -380,8 +380,6 @@ private[hive] case class HiveUDAFFunction( override def nullable: Boolean = true - override def supportsPartial: Boolean = true - override lazy val dataType: DataType = inspectorToDataType(returnInspector) override def prettyName: String = name http://git-wip-us.apache.org/repos/asf/spark/blob/101556d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala index aaf1db6..31b2430 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala @@ -42,8 +42,6 @@ case class TestingTypedCount( override def nullable: Boolean = false - override val supportsPartial: Boolean = true - override def createAggregationBuffer(): State = TestingTypedCount.State(0L) override def update(buffer: State, input: InternalRow): State = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
