Repository: spark
Updated Branches:
  refs/heads/master 52dd4b2b2 -> ee569a0c7


[SPARK-5680][SQL] Sum function on all null values, should return zero

SELECT sum('a'), avg('a'), variance('a'), std('a') FROM src;
Should give output as
0.0     NULL    NULL    NULL
This fixes hive udaf_number_format.q

Author: Venkata Ramana G <ramana.gollamudihuawei.com>

Author: Venkata Ramana Gollamudi <[email protected]>

Closes #4466 from gvramana/sum_fix and squashes the following commits:

42e14d1 [Venkata Ramana Gollamudi] Added comments
39415c0 [Venkata Ramana Gollamudi] Handled the partitioned Sum expression 
scenario
df66515 [Venkata Ramana Gollamudi] code style fix
4be2606 [Venkata Ramana Gollamudi] Add udaf_number_format to whitelist and 
golden answer
330fd64 [Venkata Ramana Gollamudi] fix sum function for all null data


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee569a0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee569a0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee569a0c

Branch: refs/heads/master
Commit: ee569a0c7171d149eee52877def902378eaf695e
Parents: 52dd4b2
Author: Venkata Ramana Gollamudi <[email protected]>
Authored: Sat Mar 21 13:24:24 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Sat Mar 21 13:24:24 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/aggregates.scala   | 68 +++++++++++++++++++-
 .../hive/execution/HiveCompatibilitySuite.scala |  1 +
 ...er_format-0-eff4ef3c207d14d5121368f294697964 |  0
 ...er_format-1-4a03c4328565c60ca99689239f07fb16 |  1 +
 4 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee569a0c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 735b748..5297d1e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -346,13 +346,13 @@ case class Sum(child: Expression) extends 
PartialAggregate with trees.UnaryNode[
       case DecimalType.Fixed(_, _) =>
         val partialSum = Alias(Sum(Cast(child, DecimalType.Unlimited)), 
"PartialSum")()
         SplitEvaluation(
-          Cast(Sum(partialSum.toAttribute), dataType),
+          Cast(CombineSum(partialSum.toAttribute), dataType),
           partialSum :: Nil)
 
       case _ =>
         val partialSum = Alias(Sum(child), "PartialSum")()
         SplitEvaluation(
-          Sum(partialSum.toAttribute),
+          CombineSum(partialSum.toAttribute),
           partialSum :: Nil)
     }
   }
@@ -360,6 +360,30 @@ case class Sum(child: Expression) extends PartialAggregate 
with trees.UnaryNode[
   override def newInstance() = new SumFunction(child, this)
 }
 
+/**
+ * Sum should satisfy 3 cases:
+ * 1) sum of all null values = zero
+ * 2) sum for table column with no data = null
+ * 3) sum of column with null and not null values = sum of not null values
+ * Require separate CombineSum Expression and function as it has to 
distinguish "No data" case
+ * versus "data equals null" case, while aggregating results and at each 
partial expression.i.e.,
+ * Combining    PartitionLevel   InputData
+ *                           <-- null
+ * Zero     <-- Zero         <-- null
+ *                              
+ *          <-- null         <-- no data
+ * null     <-- null         <-- no data 
+ */
+case class CombineSum(child: Expression) extends AggregateExpression {
+  def this() = this(null)
+  
+  override def children = child :: Nil
+  override def nullable = true
+  override def dataType = child.dataType
+  override def toString = s"CombineSum($child)"
+  override def newInstance() = new CombineSumFunction(child, this)
+}
+
 case class SumDistinct(child: Expression)
   extends PartialAggregate with trees.UnaryNode[Expression] {
 
@@ -565,7 +589,8 @@ case class SumFunction(expr: Expression, base: 
AggregateExpression) extends Aggr
 
   private val sum = MutableLiteral(null, calcType)
 
-  private val addFunction = Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), 
Cast(expr, calcType)), sum))
+  private val addFunction = 
+    Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, 
zero))
 
   override def update(input: Row): Unit = {
     sum.update(addFunction, input)
@@ -580,6 +605,43 @@ case class SumFunction(expr: Expression, base: 
AggregateExpression) extends Aggr
   }
 }
 
+case class CombineSumFunction(expr: Expression, base: AggregateExpression)
+  extends AggregateFunction {
+  
+  def this() = this(null, null) // Required for serialization.
+
+  private val calcType =
+    expr.dataType match {
+      case DecimalType.Fixed(_, _) =>
+        DecimalType.Unlimited
+      case _ =>
+        expr.dataType
+    }
+
+  private val zero = Cast(Literal(0), calcType)
+
+  private val sum = MutableLiteral(null, calcType)
+
+  private val addFunction = 
+    Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, 
zero))
+  
+  override def update(input: Row): Unit = {
+    val result = expr.eval(input)
+    // partial sum result can be null only when no input rows present 
+    if(result != null) {
+      sum.update(addFunction, input)
+    }
+  }
+
+  override def eval(input: Row): Any = {
+    expr.dataType match {
+      case DecimalType.Fixed(_, _) =>
+        Cast(sum, dataType).eval(null)
+      case _ => sum.eval(null)
+    }
+  }
+}
+
 case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
   extends AggregateFunction {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee569a0c/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 5180a7f..2ae9d01 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -800,6 +800,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
     "udaf_covar_pop",
     "udaf_covar_samp",
     "udaf_histogram_numeric",
+    "udaf_number_format",
     "udf2",
     "udf5",
     "udf6",

http://git-wip-us.apache.org/repos/asf/spark/blob/ee569a0c/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964
 
b/sql/hive/src/test/resources/golden/udaf_number_format-0-eff4ef3c207d14d5121368f294697964
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/ee569a0c/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16
 
b/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16
new file mode 100644
index 0000000..c6f275a
--- /dev/null
+++ 
b/sql/hive/src/test/resources/golden/udaf_number_format-1-4a03c4328565c60ca99689239f07fb16
@@ -0,0 +1 @@
+0.0    NULL    NULL    NULL


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to