This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6303fc9940 support SKEW_POP and KURTOSIS_POP aggregates (#10021)
6303fc9940 is described below

commit 6303fc9940b77358d259607e8328a13c5577773a
Author: Almog Gavra <almog.ga...@gmail.com>
AuthorDate: Thu Dec 22 18:05:06 2022 -0800

    support SKEW_POP and KURTOSIS_POP aggregates (#10021)
    
    * support SKEW_POP and KURTOSIS_POP aggregates
    
    * address feedback
    
    * address feedback 2
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  27 +++-
 .../function/AggregationFunctionFactory.java       |   4 +
 .../function/FourthMomentAggregationFunction.java  | 166 +++++++++++++++++++
 .../function/AggregationFunctionFactoryTest.java   |  14 ++
 .../pinot/queries/StatisticalQueriesTest.java      | 179 +++++++++++++++++++++
 .../local/customobject/PinotFourthMoment.java      | 135 ++++++++++++++++
 .../local/customobject/PinotFourthMomentTest.java  | 102 ++++++++++++
 .../pinot/segment/spi/AggregationFunctionType.java |   2 +
 8 files changed, 627 insertions(+), 2 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 6fbacc5fcd..fbfee474e9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -70,6 +70,7 @@ import 
org.apache.pinot.segment.local.customobject.FloatLongPair;
 import org.apache.pinot.segment.local.customobject.IntLongPair;
 import org.apache.pinot.segment.local.customobject.LongLongPair;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
 import org.apache.pinot.segment.local.customobject.VarianceTuple;
@@ -125,7 +126,8 @@ public class ObjectSerDeUtils {
     DoubleLongPair(30),
     StringLongPair(31),
     CovarianceTuple(32),
-    VarianceTuple(33);
+    VarianceTuple(33),
+    PinotFourthMoment(34);
 
     private final int _value;
 
@@ -209,6 +211,8 @@ public class ObjectSerDeUtils {
         return ObjectType.CovarianceTuple;
       } else if (value instanceof VarianceTuple) {
         return ObjectType.VarianceTuple;
+      } else if (value instanceof PinotFourthMoment) {
+        return ObjectType.PinotFourthMoment;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + 
value.getClass().getSimpleName());
       }
@@ -483,6 +487,24 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<PinotFourthMoment> 
PINOT_FOURTH_MOMENT_OBJECT_SER_DE
+      = new ObjectSerDe<PinotFourthMoment>() {
+    @Override
+    public byte[] serialize(PinotFourthMoment value) {
+      return value.serialize();
+    }
+
+    @Override
+    public PinotFourthMoment deserialize(byte[] bytes) {
+      return PinotFourthMoment.fromBytes(bytes);
+    }
+
+    @Override
+    public PinotFourthMoment deserialize(ByteBuffer byteBuffer) {
+      return PinotFourthMoment.fromBytes(byteBuffer);
+    }
+  };
+
   public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new 
ObjectSerDe<HyperLogLog>() {
 
     @Override
@@ -1213,7 +1235,8 @@ public class ObjectSerDeUtils {
       DOUBLE_LONG_PAIR_SER_DE,
       STRING_LONG_PAIR_SER_DE,
       COVARIANCE_TUPLE_OBJECT_SER_DE,
-      VARIANCE_TUPLE_OBJECT_SER_DE
+      VARIANCE_TUPLE_OBJECT_SER_DE,
+      PINOT_FOURTH_MOMENT_OBJECT_SER_DE
   };
   //@formatter:on
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 2758d193a2..4ba3413221 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -285,6 +285,10 @@ public class AggregationFunctionFactory {
             return new VarianceAggregationFunction(firstArgument, false, true);
           case STDDEVSAMP:
             return new VarianceAggregationFunction(firstArgument, true, true);
+          case SKEWNESS:
+            return new FourthMomentAggregationFunction(firstArgument, 
FourthMomentAggregationFunction.Type.SKEWNESS);
+          case KURTOSIS:
+            return new FourthMomentAggregationFunction(firstArgument, 
FourthMomentAggregationFunction.Type.KURTOSIS);
           default:
             throw new IllegalArgumentException();
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
new file mode 100644
index 0000000000..9cb06e4eeb
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class FourthMomentAggregationFunction extends 
BaseSingleInputAggregationFunction<PinotFourthMoment, Double> {
+
+  private final Type _type;
+
+  enum Type {
+    KURTOSIS, SKEWNESS
+  }
+
+  public FourthMomentAggregationFunction(ExpressionContext expression, Type 
type) {
+    super(expression);
+    _type = type;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    switch (_type) {
+      case KURTOSIS:
+        return AggregationFunctionType.KURTOSIS;
+      case SKEWNESS:
+        return AggregationFunctionType.SKEWNESS;
+      default:
+        throw new IllegalArgumentException("Unexpected type " + _type);
+    }
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    double[] values = 
StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
+
+    PinotFourthMoment m4 = aggregationResultHolder.getResult();
+    if (m4 == null) {
+      m4 = new PinotFourthMoment();
+      aggregationResultHolder.setValue(m4);
+    }
+
+    for (int i = 0; i < length; i++) {
+      m4.increment(values[i]);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    double[] values = 
StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
+    for (int i = 0; i < length; i++) {
+      PinotFourthMoment m4 = groupByResultHolder.getResult(groupKeyArray[i]);
+      if (m4 == null) {
+        m4 = new PinotFourthMoment();
+        groupByResultHolder.setValueForKey(groupKeyArray[i], m4);
+      }
+      m4.increment(values[i]);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    double[] values = 
StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
+    for (int i = 0; i < length; i++) {
+      for (int groupKey : groupKeysArray[i]) {
+        PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey);
+        if (m4 == null) {
+          m4 = new PinotFourthMoment();
+          groupByResultHolder.setValueForKey(groupKey, m4);
+        }
+        m4.increment(values[i]);
+      }
+    }
+  }
+
+  @Override
+  public PinotFourthMoment extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    PinotFourthMoment m4 = aggregationResultHolder.getResult();
+    if (m4 == null) {
+      return new PinotFourthMoment();
+    } else {
+      return m4;
+    }
+  }
+
+  @Override
+  public PinotFourthMoment extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey);
+    if (m4 == null) {
+      return new PinotFourthMoment();
+    } else {
+      return m4;
+    }
+  }
+
+  @Override
+  public PinotFourthMoment merge(PinotFourthMoment intermediateResult1, 
PinotFourthMoment intermediateResult2) {
+    intermediateResult1.combine(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.DOUBLE;
+  }
+
+  @Override
+  public Double extractFinalResult(PinotFourthMoment m4) {
+    if (m4 == null) {
+      return null;
+    }
+
+    switch (_type) {
+      case KURTOSIS:
+        return m4.kurtosis();
+      case SKEWNESS:
+        return m4.skew();
+      default:
+        throw new IllegalStateException("Unexpected value: " + _type);
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index 144949a0d5..a29694ef79 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -458,6 +458,20 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.BOOLOR);
     assertEquals(aggregationFunction.getColumnName(), "boolOr_column");
     assertEquals(aggregationFunction.getResultColumnName(), "boolor(column)");
+
+    function = getFunction("skewness");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.SKEWNESS);
+    assertEquals(aggregationFunction.getColumnName(), "skewness_column");
+    assertEquals(aggregationFunction.getResultColumnName(), 
"skewness(column)");
+
+    function = getFunction("kurtosis");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.KURTOSIS);
+    assertEquals(aggregationFunction.getColumnName(), "kurtosis_column");
+    assertEquals(aggregationFunction.getResultColumnName(), 
"kurtosis(column)");
   }
 
   private FunctionContext getFunction(String functionName) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
index 05defe18f0..9365006b35 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.math3.stat.correlation.Covariance;
+import org.apache.commons.math3.stat.descriptive.moment.Kurtosis;
+import org.apache.commons.math3.stat.descriptive.moment.Skewness;
 import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
 import org.apache.commons.math3.stat.descriptive.moment.Variance;
 import org.apache.commons.math3.util.Precision;
@@ -39,6 +41,7 @@ import 
org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.operator.query.GroupByOperator;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.segment.local.customobject.CovarianceTuple;
+import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
 import org.apache.pinot.segment.local.customobject.VarianceTuple;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -705,6 +708,172 @@ public class StatisticalQueriesTest extends 
BaseQueriesTest {
     }
   }
 
+  @Test
+  public void testSkewAggregationOnly() {
+    // Compute the expected values
+    Skewness[] expectedSkew = new Skewness[4];
+    for (int i = 0; i < 4; i++) {
+      expectedSkew[i] = new Skewness();
+    }
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      expectedSkew[0].increment(_intColX[i]);
+      expectedSkew[1].increment(_longCol[i]);
+      expectedSkew[2].increment(_floatCol[i]);
+      expectedSkew[3].increment(_doubleColX[i]);
+    }
+
+    // Compute the query
+    String query =
+        "SELECT SKEWNESS(intColumnX), SKEWNESS(longColumn), 
SKEWNESS(floatColumn), SKEWNESS(doubleColumnX) "
+            + "FROM testTable";
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        NUM_RECORDS * 4, NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getResults();
+
+    // Validate the aggregation results
+    checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(0), 
NUM_RECORDS, expectedSkew[0].getResult());
+    checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(1), 
NUM_RECORDS, expectedSkew[1].getResult());
+    checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(2), 
NUM_RECORDS, expectedSkew[2].getResult());
+    checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(3), 
NUM_RECORDS, expectedSkew[3].getResult());
+
+    // Validate the response
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    brokerResponse.getResultTable();
+    Object[] results = brokerResponse.getResultTable().getRows().get(0);
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[0], 
expectedSkew[0].getResult(), RELATIVE_EPSILON));
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[1], 
expectedSkew[1].getResult(), RELATIVE_EPSILON));
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[2], 
expectedSkew[2].getResult(), RELATIVE_EPSILON));
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[3], 
expectedSkew[3].getResult(), RELATIVE_EPSILON));
+
+    // Validate the response for a query with a filter
+    query = "SELECT SKEWNESS(intColumnX) from testTable" + getFilter();
+    brokerResponse = getBrokerResponse(query);
+    brokerResponse.getResultTable();
+    results = brokerResponse.getResultTable().getRows().get(0);
+    Skewness filterExpectedSkew = new Skewness();
+    for (int i = 0; i < NUM_RECORDS / 2; i++) {
+      filterExpectedSkew.increment(_intColX[i]);
+    }
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[0], 
filterExpectedSkew.getResult(), RELATIVE_EPSILON));
+  }
+
+  @Test
+  public void testSkewAggregationGroupBy() {
+    // Compute expected group results
+    Skewness[] expectedGroupByResult = new Skewness[NUM_GROUPS];
+
+    for (int i = 0; i < NUM_GROUPS; i++) {
+      expectedGroupByResult[i] = new Skewness();
+    }
+    for (int j = 0; j < NUM_RECORDS; j++) {
+      int pos = j / (NUM_RECORDS / NUM_GROUPS);
+      expectedGroupByResult[pos].increment(_intColX[j]);
+    }
+
+    String query = "SELECT SKEWNESS(intColumnX) FROM testTable GROUP BY 
groupByColumn ORDER BY groupByColumn";
+    GroupByOperator groupByOperator = getOperator(query);
+    GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        NUM_RECORDS * 2, NUM_RECORDS);
+    AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+    for (int i = 0; i < NUM_GROUPS; i++) {
+      PinotFourthMoment actual = (PinotFourthMoment) 
aggregationGroupByResult.getResultForGroupId(0, i);
+      checkWithPrecisionForSkew(actual, NUM_RECORDS / NUM_GROUPS, 
expectedGroupByResult[i].getResult());
+    }
+  }
+
+  @Test
+  public void testKurtosisAggregationOnly() {
+    // Compute the expected values
+    Kurtosis[] expectedKurt = new Kurtosis[4];
+    for (int i = 0; i < 4; i++) {
+      expectedKurt[i] = new Kurtosis();
+    }
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      expectedKurt[0].increment(_intColX[i]);
+      expectedKurt[1].increment(_longCol[i]);
+      expectedKurt[2].increment(_floatCol[i]);
+      expectedKurt[3].increment(_doubleColX[i]);
+    }
+
+    // Compute the query
+    String query =
+        "SELECT KURTOSIS(intColumnX), KURTOSIS(longColumn), 
KURTOSIS(floatColumn), "
+            + "KURTOSIS(doubleColumnX) FROM testTable";
+    AggregationOperator aggregationOperator = getOperator(query);
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        NUM_RECORDS * 4, NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getResults();
+
+    // Validate the aggregation results
+    checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(0), 
NUM_RECORDS, expectedKurt[0].getResult());
+    checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(1), 
NUM_RECORDS, expectedKurt[1].getResult());
+    checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(2), 
NUM_RECORDS, expectedKurt[2].getResult());
+    checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(3), 
NUM_RECORDS, expectedKurt[3].getResult());
+
+    // Validate the response
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    brokerResponse.getResultTable();
+    Object[] results = brokerResponse.getResultTable().getRows().get(0);
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[0], 
expectedKurt[0].getResult(), RELATIVE_EPSILON));
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[1], 
expectedKurt[1].getResult(), RELATIVE_EPSILON));
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[2], 
expectedKurt[2].getResult(), RELATIVE_EPSILON));
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[3], 
expectedKurt[3].getResult(), RELATIVE_EPSILON));
+
+    // Validate the response for a query with a filter
+    query = "SELECT KURTOSIS(intColumnX) from testTable" + getFilter();
+    brokerResponse = getBrokerResponse(query);
+    brokerResponse.getResultTable();
+    results = brokerResponse.getResultTable().getRows().get(0);
+    Kurtosis filterExpectedKurt = new Kurtosis();
+    for (int i = 0; i < NUM_RECORDS / 2; i++) {
+      filterExpectedKurt.increment(_intColX[i]);
+    }
+    assertTrue(
+        Precision.equalsWithRelativeTolerance((double) results[0], 
filterExpectedKurt.getResult(), RELATIVE_EPSILON));
+  }
+
+  @Test
+  public void testKurtosisAggregationGroupBy() {
+    // Compute expected group results
+    Kurtosis[] expectedGroupByResult = new Kurtosis[NUM_GROUPS];
+
+    for (int i = 0; i < NUM_GROUPS; i++) {
+      expectedGroupByResult[i] = new Kurtosis();
+    }
+    for (int j = 0; j < NUM_RECORDS; j++) {
+      int pos = j / (NUM_RECORDS / NUM_GROUPS);
+      expectedGroupByResult[pos].increment(_intColX[j]);
+    }
+
+    String query = "SELECT KURTOSIS(intColumnX) FROM testTable GROUP BY 
groupByColumn ORDER BY groupByColumn";
+    GroupByOperator groupByOperator = getOperator(query);
+    GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(),
 NUM_RECORDS, 0,
+        NUM_RECORDS * 2, NUM_RECORDS);
+    AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+    for (int i = 0; i < NUM_GROUPS; i++) {
+      PinotFourthMoment actual = (PinotFourthMoment) 
aggregationGroupByResult.getResultForGroupId(0, i);
+      checkWithPrecisionForKurt(actual, NUM_RECORDS / NUM_GROUPS, 
expectedGroupByResult[i].getResult());
+    }
+  }
+
   private void checkWithPrecisionForCovariance(CovarianceTuple tuple, double 
sumX, double sumY, double sumXY,
       int count) {
     assertEquals(tuple.getCount(), count);
@@ -766,6 +935,16 @@ public class StatisticalQueriesTest extends 
BaseQueriesTest {
     }
   }
 
+  private void checkWithPrecisionForSkew(PinotFourthMoment m4, int 
expectedCount, double expectedSkew) {
+    assertEquals(m4.getN(), expectedCount);
+    assertTrue(Precision.equalsWithRelativeTolerance(m4.skew(), expectedSkew, 
RELATIVE_EPSILON));
+  }
+
+  private void checkWithPrecisionForKurt(PinotFourthMoment m4, int 
expectedCount, double expectedSkew) {
+    assertEquals(m4.getN(), expectedCount);
+    assertTrue(Precision.equalsWithRelativeTolerance(m4.kurtosis(), 
expectedSkew, RELATIVE_EPSILON));
+  }
+
   private double computeVariancePop(VarianceTuple varianceTuple) {
     return varianceTuple.getM2() / varianceTuple.getCount();
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java
new file mode 100644
index 0000000000..f5ebb46a76
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import org.apache.commons.math.stat.descriptive.moment.FourthMoment;
+import org.apache.commons.math.stat.descriptive.moment.Kurtosis;
+import org.apache.commons.math.stat.descriptive.moment.Skewness;
+
+
+/**
+ * A {@link Comparable} implementation of the <a 
href=https://en.wikipedia.org/wiki/Moment_(mathematics)>
+ * Fourth Statistical Moment</a> that uses the apache commons algorithm for 
computing it in
+ * one pass. It additionally supports serialization and deserialization 
methods, which is helpful
+ * for combining moments across servers.
+ *
+ * <p>The commons implementation does not support parallel-computation, 
support for which is added
+ * in the {@link #combine(PinotFourthMoment)} method inspired by Presto's 
implementation.
+ *
+ * <pre>
+ * Also See: <a 
href="https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook
+ * /presto/operator/aggregation/AggregationUtils.java#L188">Presto's 
Implementation</a>
+ * </pre>
+ */
+public class PinotFourthMoment extends FourthMoment implements 
Comparable<PinotFourthMoment> {
+
+  private static final Comparator<PinotFourthMoment> COMPARATOR = 
Comparator.<PinotFourthMoment>naturalOrder()
+      .thenComparingLong(x -> x.n)
+      .thenComparingDouble(x -> x.m1)
+      .thenComparingDouble(x -> x.m2)
+      .thenComparingDouble(x -> x.m3)
+      .thenComparingDouble(x -> x.m4);
+
+  public void combine(PinotFourthMoment other) {
+    combine(other.n, other.m1, other.m2, other.m3, other.m4);
+  }
+
+  public void combine(long bN, double bM1, double bM2, double bM3, double bM4) 
{
+    if (bN == 0) {
+      return;
+    } else if (n == 0) {
+      n = bN;
+      m1 = bM1;
+      m2 = bM2;
+      m3 = bM3;
+      m4 = bM4;
+      return;
+    }
+
+    long aN = n;
+    double aM1 = m1;
+    double aM2 = m2;
+    double aM3 = m3;
+    double aM4 = m4;
+
+    long n = aN + bN;
+    double m1 = (aN * aM1 + bN * bM1) / n;
+
+    double delta = bM1 - aM1;
+    double delta2 = delta * delta;
+    double m2 = aM2 + bM2 + delta2 * aN * bN / n;
+
+    double delta3 = delta2 * delta;
+    double m3 = aM3 + bM3
+        + delta3 * aN * bN * (aN - bN) / (n * n)
+        + 3d * delta * (aN * bM2 - bN * aM2) / n;
+
+    double delta4 = delta3 * delta;
+    double n3 = ((double) n) * n * n; // avoid overflow
+    double m4 = aM4 + bM4
+        + delta4 * aN * bN * (aN * aN - aN * bN + bN * bN) / (n3)
+        + 6.0 * delta2 * (aN * aN * bM2 + bN * bN * aM2) / (n * n)
+        + 4d * delta * (aN * bM3 - bN * aM3) / n;
+
+    this.n = n;
+    this.m1 = m1;
+    this.m2 = m2;
+    this.m3 = m3;
+    this.m4 = m4;
+  }
+
+  public double skew() {
+    return new Skewness(this).getResult();
+  }
+
+  public double kurtosis() {
+    return new Kurtosis(this).getResult();
+  }
+
+  public byte[] serialize() {
+    ByteBuffer buff = ByteBuffer.allocate(Long.BYTES + Double.BYTES * 4);
+    buff.putLong(n)
+        .putDouble(m1)
+        .putDouble(m2)
+        .putDouble(m3)
+        .putDouble(m4);
+    return buff.array();
+  }
+
+  public static PinotFourthMoment fromBytes(byte[] bytes) {
+    return fromBytes(ByteBuffer.wrap(bytes));
+  }
+
+  public static PinotFourthMoment fromBytes(ByteBuffer buff) {
+    PinotFourthMoment moment = new PinotFourthMoment();
+    moment.n = buff.getLong();
+    moment.m1 = buff.getDouble();
+    moment.m2 = buff.getDouble();
+    moment.m3 = buff.getDouble();
+    moment.m4 = buff.getDouble();
+    return moment;
+  }
+
+  @Override
+  public int compareTo(PinotFourthMoment o) {
+    return COMPARATOR.compare(this, o);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java
new file mode 100644
index 0000000000..727bea7d87
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Random;
+import java.util.stream.IntStream;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class PinotFourthMomentTest {
+
+  @Test
+  public void shouldCombineMoments() {
+    // Given:
+    Random r = new Random();
+    double[] xs = IntStream.generate(r::nextInt)
+        .limit(100)
+        .mapToDouble(i -> (double) i)
+        .toArray();
+
+    PinotFourthMoment a = new PinotFourthMoment();
+    PinotFourthMoment b = new PinotFourthMoment();
+    PinotFourthMoment c = new PinotFourthMoment();
+
+    // When:
+    for (int i = 0; i < xs.length; i++) {
+      a.increment(xs[i]);
+      (i < xs.length / 2 ? b : c).increment(xs[i]);
+    }
+    b.combine(c);
+
+    // Then:
+    assertEquals(b.skew(), a.skew(), .01);
+    assertEquals(b.kurtosis(), a.kurtosis(), .01);
+  }
+
+  @Test
+  public void shouldCombineLeftEmptyMoments() {
+    // Given:
+    Random r = new Random();
+    double[] xs = IntStream.generate(r::nextInt)
+        .limit(100)
+        .mapToDouble(i -> (double) i)
+        .toArray();
+
+    PinotFourthMoment a = new PinotFourthMoment();
+    PinotFourthMoment b = new PinotFourthMoment();
+
+    // When:
+    for (double x : xs) {
+      a.increment(x);
+    }
+
+    b.combine(a);
+
+    // Then:
+    assertEquals(b.kurtosis(), a.kurtosis(), .01);
+  }
+
+  @Test
+  public void shouldCombineRightEmptyMoments() {
+    // Given:
+    Random r = new Random();
+    double[] xs = IntStream.generate(r::nextInt)
+        .limit(100)
+        .mapToDouble(i -> (double) i)
+        .toArray();
+
+    PinotFourthMoment a = new PinotFourthMoment();
+    PinotFourthMoment b = new PinotFourthMoment();
+
+    // When:
+    for (double x : xs) {
+      a.increment(x);
+    }
+
+    double kurtosisBeforeCombine = a.kurtosis();
+    a.combine(b);
+
+    // Then:
+    assertEquals(a.kurtosis(), kurtosisBeforeCombine, .01);
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 584c99a7b2..beaac5718d 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -63,6 +63,8 @@ public enum AggregationFunctionType {
   VARSAMP("varSamp"),
   STDDEVPOP("stdDevPop"),
   STDDEVSAMP("stdDevSamp"),
+  SKEWNESS("skewness"),
+  KURTOSIS("kurtosis"),
 
   // Geo aggregation functions
   STUNION("STUnion"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to