Repository: spark Updated Branches: refs/heads/master da936fbb7 -> 6dbfd7ecf
[SPARK-10982] [SQL] Rename ExpressionAggregate -> DeclarativeAggregate. DeclarativeAggregate matches more closely with ImperativeAggregate we already have. Author: Reynold Xin <[email protected]> Closes #9013 from rxin/SPARK-10982. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6dbfd7ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6dbfd7ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6dbfd7ec Branch: refs/heads/master Commit: 6dbfd7ecf41297213f4ce8024d00c40808c5ac8f Parents: da936fb Author: Reynold Xin <[email protected]> Authored: Wed Oct 7 15:38:46 2015 -0700 Committer: Josh Rosen <[email protected]> Committed: Wed Oct 7 15:38:46 2015 -0700 ---------------------------------------------------------------------- .../catalyst/expressions/aggregate/functions.scala | 16 ++++++++-------- .../catalyst/expressions/aggregate/interfaces.scala | 4 ++-- .../execution/aggregate/AggregationIterator.scala | 16 ++++++++-------- .../aggregate/TungstenAggregationIterator.scala | 12 ++++++------ .../spark/sql/execution/aggregate/utils.scala | 8 ++++---- 5 files changed, 28 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6dbfd7ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala index 4ad2607..8aad0b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ -case class Average(child: Expression) extends ExpressionAggregate { +case class Average(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -88,7 +88,7 @@ case class Average(child: Expression) extends ExpressionAggregate { } } -case class Count(child: Expression) extends ExpressionAggregate { +case class Count(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil override def nullable: Boolean = false @@ -118,7 +118,7 @@ case class Count(child: Expression) extends ExpressionAggregate { override val evaluateExpression = Cast(currentCount, LongType) } -case class First(child: Expression) extends ExpressionAggregate { +case class First(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -152,7 +152,7 @@ case class First(child: Expression) extends ExpressionAggregate { override val evaluateExpression = first } -case class Last(child: Expression) extends ExpressionAggregate { +case class Last(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -186,7 +186,7 @@ case class Last(child: Expression) extends ExpressionAggregate { override val evaluateExpression = last } -case class Max(child: Expression) extends ExpressionAggregate { +case class Max(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -220,7 +220,7 @@ case class Max(child: Expression) extends ExpressionAggregate { override val evaluateExpression = max } -case class Min(child: Expression) extends ExpressionAggregate { +case class Min(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -277,7 +277,7 @@ case class StddevSamp(child: Expression) extends StddevAgg(child) { // Compute standard deviation based on online algorithm specified here: // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance -abstract class StddevAgg(child: Expression) extends ExpressionAggregate { +abstract class StddevAgg(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil @@ -397,7 +397,7 @@ abstract class StddevAgg(child: Expression) extends ExpressionAggregate { } } -case class Sum(child: Expression) extends ExpressionAggregate { +case class Sum(child: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = child :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/6dbfd7ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 74e15ec..9ba3a9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -97,7 +97,7 @@ private[sql] case class AggregateExpression2( * * - [[ImperativeAggregate]] is for aggregation functions that are specified in terms of * initialize(), update(), and merge() functions that operate on Row-based aggregation buffers. - * - [[ExpressionAggregate]] is for aggregation functions that are specified using + * - [[DeclarativeAggregate]] is for aggregation functions that are specified using * Catalyst expressions. * * In both interfaces, aggregates must define the schema ([[aggBufferSchema]]) and attributes @@ -244,7 +244,7 @@ abstract class ImperativeAggregate extends AggregateFunction2 { * can then use these attributes when defining `updateExpressions`, `mergeExpressions`, and * `evaluateExpressions`. */ -abstract class ExpressionAggregate +abstract class DeclarativeAggregate extends AggregateFunction2 with Serializable with Unevaluable { http://git-wip-us.apache.org/repos/asf/spark/blob/6dbfd7ec/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala index 0490302..5f7341e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala @@ -127,7 +127,7 @@ abstract class AggregationIterator( var i = 0 while (i < allAggregateFunctions.length) { allAggregateFunctions(i) match { - case agg: ExpressionAggregate => + case agg: DeclarativeAggregate => case _ => positions += i } i += 1 @@ -146,7 +146,7 @@ abstract class AggregationIterator( // The projection used to initialize buffer values for all expression-based aggregates. private[this] val expressionAggInitialProjection = { val initExpressions = allAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.initialValues + case ae: DeclarativeAggregate => ae.initialValues // For the positions corresponding to imperative aggregate functions, we'll use special // no-op expressions which are ignored during projection code-generation. case i: ImperativeAggregate => Seq.fill(i.aggBufferAttributes.length)(NoOp) @@ -172,7 +172,7 @@ abstract class AggregationIterator( // Partial-only case (Some(Partial), None) => val updateExpressions = nonCompleteAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.updateExpressions + case ae: DeclarativeAggregate => ae.updateExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val expressionAggUpdateProjection = @@ -204,7 +204,7 @@ abstract class AggregationIterator( // groupingKeyAttributes ++ // allAggregateFunctions.flatMap(_.cloneBufferAttributes) val mergeExpressions = nonCompleteAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.mergeExpressions + case ae: DeclarativeAggregate => ae.mergeExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } // This projection is used to merge buffer values for all expression-based aggregates. @@ -248,7 +248,7 @@ abstract class AggregationIterator( nonCompleteAggregateFunctions.flatMap(_.inputAggBufferAttributes) val mergeExpressions = nonCompleteAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.mergeExpressions + case ae: DeclarativeAggregate => ae.mergeExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } ++ completeOffsetExpressions val finalExpressionAggMergeProjection = @@ -256,7 +256,7 @@ abstract class AggregationIterator( val updateExpressions = finalOffsetExpressions ++ completeAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.updateExpressions + case ae: DeclarativeAggregate => ae.updateExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val completeExpressionAggUpdateProjection = @@ -291,7 +291,7 @@ abstract class AggregationIterator( val updateExpressions = completeAggregateFunctions.flatMap { - case ae: ExpressionAggregate => ae.updateExpressions + case ae: DeclarativeAggregate => ae.updateExpressions case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val completeExpressionAggUpdateProjection = @@ -353,7 +353,7 @@ abstract class AggregationIterator( val bufferSchemata = allAggregateFunctions.flatMap(_.aggBufferAttributes) val evalExpressions = allAggregateFunctions.map { - case ae: ExpressionAggregate => ae.evaluateExpression + case ae: DeclarativeAggregate => ae.evaluateExpression case agg: AggregateFunction2 => NoOp } val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)() http://git-wip-us.apache.org/repos/asf/spark/blob/6dbfd7ec/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 6a84c0a..a6f4c1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -131,15 +131,15 @@ class TungstenAggregationIterator( // All aggregate functions. TungstenAggregationIterator only handles expression-based aggregate. // If there is any functions that is an ImperativeAggregateFunction, we throw an // IllegalStateException. - private[this] val allAggregateFunctions: Array[ExpressionAggregate] = { + private[this] val allAggregateFunctions: Array[DeclarativeAggregate] = { if (!allAggregateExpressions.forall( - _.aggregateFunction.isInstanceOf[ExpressionAggregate])) { + _.aggregateFunction.isInstanceOf[DeclarativeAggregate])) { throw new IllegalStateException( "Only ExpressionAggregateFunctions should be passed in TungstenAggregationIterator.") } allAggregateExpressions - .map(_.aggregateFunction.asInstanceOf[ExpressionAggregate]) + .map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate]) .toArray } @@ -203,9 +203,9 @@ class TungstenAggregationIterator( // Final-Complete case (Some(Final), Some(Complete)) => - val nonCompleteAggregateFunctions: Array[ExpressionAggregate] = + val nonCompleteAggregateFunctions: Array[DeclarativeAggregate] = allAggregateFunctions.take(nonCompleteAggregateExpressions.length) - val completeAggregateFunctions: Array[ExpressionAggregate] = + val completeAggregateFunctions: Array[DeclarativeAggregate] = allAggregateFunctions.takeRight(completeAggregateExpressions.length) val completeOffsetExpressions = @@ -235,7 +235,7 @@ class TungstenAggregationIterator( // Complete-only case (None, Some(Complete)) => - val completeAggregateFunctions: Array[ExpressionAggregate] = + val completeAggregateFunctions: Array[DeclarativeAggregate] = allAggregateFunctions.takeRight(completeAggregateExpressions.length) val updateExpressions = http://git-wip-us.apache.org/repos/asf/spark/blob/6dbfd7ec/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index e1d7e1b..e1c2d94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -97,7 +97,7 @@ object Utils { // Check if we can use TungstenAggregate. val usesTungstenAggregate = child.sqlContext.conf.unsafeEnabled && - aggregateExpressions.forall(_.aggregateFunction.isInstanceOf[ExpressionAggregate]) && + aggregateExpressions.forall(_.aggregateFunction.isInstanceOf[DeclarativeAggregate]) && supportsTungstenAggregate( groupingExpressions, aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)) @@ -157,7 +157,7 @@ object Utils { // aggregateFunctionMap contains unique aggregate functions. val aggregateFunction = aggregateFunctionMap(agg.aggregateFunction, agg.isDistinct)._1 - aggregateFunction.asInstanceOf[ExpressionAggregate].evaluateExpression + aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression case expression => // We do not rely on the equality check at here since attributes may // different cosmetically. Instead, we use semanticEquals. @@ -215,7 +215,7 @@ object Utils { val usesTungstenAggregate = child.sqlContext.conf.unsafeEnabled && aggregateExpressions.forall( - _.aggregateFunction.isInstanceOf[ExpressionAggregate]) && + _.aggregateFunction.isInstanceOf[DeclarativeAggregate]) && supportsTungstenAggregate( groupingExpressions, aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)) @@ -359,7 +359,7 @@ object Utils { // aggregate functions that have not been rewritten. aggregateFunctionMap(function, isDistinct)._1 } - aggregateFunction.asInstanceOf[ExpressionAggregate].evaluateExpression + aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression case expression => // We do not rely on the equality check at here since attributes may // different cosmetically. Instead, we use semanticEquals. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
