This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d1f7dc  Add functions to return raw results for Percentile TDigest 
and Est (#7226)
2d1f7dc is described below

commit 2d1f7dc9060d45ac9f4ff0621da62790ea9b8e64
Author: lakshmanan-v <85200301+lakshmana...@users.noreply.github.com>
AuthorDate: Tue Aug 24 13:54:41 2021 -0700

    Add functions to return raw results for Percentile TDigest and Est (#7226)
    
    Adding aggregate functions to return serialized / raw values of 
PercentileEst (QuantileDigest) and PercentileTDigest (TDigest) data structures.
---
 .../function/AggregationFunctionTypeTest.java      |   8 ++
 .../function/AggregationFunctionFactory.java       |  32 +++++
 .../PercentileRawEstAggregationFunction.java       | 141 ++++++++++++++++++++
 .../PercentileRawEstMVAggregationFunction.java     |  43 +++++++
 .../PercentileRawTDigestAggregationFunction.java   | 142 +++++++++++++++++++++
 .../PercentileRawTDigestMVAggregationFunction.java |  43 +++++++
 .../function/AggregationFunctionFactoryTest.java   |  42 ++++++
 .../apache/pinot/queries/ExpectedQueryResult.java  |  56 ++++++++
 ...terSegmentAggregationMultiValueQueriesTest.java | 113 ++++++++++++++++
 ...erSegmentAggregationSingleValueQueriesTest.java | 118 +++++++++++++++++
 .../org/apache/pinot/queries/QueriesTestUtils.java | 105 ++++++++++++---
 .../local/aggregator/ValueAggregatorFactory.java   |   4 +
 .../customobject/SerializedQuantileDigest.java     |  50 ++++++++
 .../local/customobject/SerializedTDigest.java      |  49 +++++++
 .../pinot/segment/spi/AggregationFunctionType.java |  12 ++
 .../misc/AggregationFunctionColumnPairTest.java    |  18 +++
 16 files changed, 956 insertions(+), 20 deletions(-)

diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index 5b1d640..042381b 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -48,6 +48,10 @@ public class AggregationFunctionTypeTest {
         AggregationFunctionType.PERCENTILEEST);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeTdIgEsT99"),
         AggregationFunctionType.PERCENTILETDIGEST);
+    
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWeSt90mV"),
+        AggregationFunctionType.PERCENTILERAWESTMV);
+    
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWtDiGeSt95mV"),
+        AggregationFunctionType.PERCENTILERAWTDIGESTMV);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("CoUnTMv"),
 AggregationFunctionType.COUNTMV);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmV"),
 AggregationFunctionType.MINMV);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MaXmV"),
 AggregationFunctionType.MAXMV);
@@ -67,6 +71,10 @@ public class AggregationFunctionTypeTest {
         AggregationFunctionType.PERCENTILEESTMV);
     
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeTdIgEsT95mV"),
         AggregationFunctionType.PERCENTILETDIGESTMV);
+    
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWeSt50"),
+        AggregationFunctionType.PERCENTILERAWEST);
+    
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("PeRcEnTiLeRaWtDiGeSt99"),
+        AggregationFunctionType.PERCENTILERAWTDIGEST);
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index ccd45fc..3285607 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -61,10 +61,18 @@ public class AggregationFunctionFactory {
             // PercentileEst
             String percentileString = remainingFunctionName.substring(3);
             return new PercentileEstAggregationFunction(firstArgument, 
parsePercentileToInt(percentileString));
+          } else if (remainingFunctionName.matches("RAWEST\\d+")) {
+            // PercentileRawEst
+            String percentileString = remainingFunctionName.substring(6);
+            return new PercentileRawEstAggregationFunction(firstArgument, 
parsePercentileToInt(percentileString));
           } else if (remainingFunctionName.matches("TDIGEST\\d+")) {
             // PercentileTDigest
             String percentileString = remainingFunctionName.substring(7);
             return new PercentileTDigestAggregationFunction(firstArgument, 
parsePercentileToInt(percentileString));
+          } else if (remainingFunctionName.matches("RAWTDIGEST\\d+")) {
+            // PercentileRawTDigest
+            String percentileString = remainingFunctionName.substring(10);
+            return new PercentileRawTDigestAggregationFunction(firstArgument, 
parsePercentileToInt(percentileString));
           } else if (remainingFunctionName.matches("\\d+MV")) {
             // PercentileMV
             String percentileString = remainingFunctionName.substring(0, 
remainingFunctionName.length() - 2);
@@ -73,10 +81,18 @@ public class AggregationFunctionFactory {
             // PercentileEstMV
             String percentileString = remainingFunctionName.substring(3, 
remainingFunctionName.length() - 2);
             return new PercentileEstMVAggregationFunction(firstArgument, 
parsePercentileToInt(percentileString));
+          } else if (remainingFunctionName.matches("RAWEST\\d+MV")) {
+            // PercentileRawEstMV
+            String percentileString = remainingFunctionName.substring(6, 
remainingFunctionName.length() - 2);
+            return new PercentileRawEstMVAggregationFunction(firstArgument, 
parsePercentileToInt(percentileString));
           } else if (remainingFunctionName.matches("TDIGEST\\d+MV")) {
             // PercentileTDigestMV
             String percentileString = remainingFunctionName.substring(7, 
remainingFunctionName.length() - 2);
             return new PercentileTDigestMVAggregationFunction(firstArgument, 
parsePercentileToInt(percentileString));
+          } else if (remainingFunctionName.matches("RAWTDIGEST\\d+MV")) {
+            // PercentileRawTDigestMV
+            String percentileString = remainingFunctionName.substring(10, 
remainingFunctionName.length() - 2);
+            return new 
PercentileRawTDigestMVAggregationFunction(firstArgument, 
parsePercentileToInt(percentileString));
           }
         } else if (numArguments == 2) {
           // Double arguments percentile (e.g. percentile(foo, 99), 
percentileTDigest(bar, 95), etc.) where the
@@ -90,10 +106,18 @@ public class AggregationFunctionFactory {
             // PercentileEst
             return new PercentileEstAggregationFunction(firstArgument, 
percentile);
           }
+          if (remainingFunctionName.equals("RAWEST")) {
+            // PercentileRawEst
+            return new PercentileRawEstAggregationFunction(firstArgument, 
percentile);
+          }
           if (remainingFunctionName.equals("TDIGEST")) {
             // PercentileTDigest
             return new PercentileTDigestAggregationFunction(firstArgument, 
percentile);
           }
+          if (remainingFunctionName.equals("RAWTDIGEST")) {
+            // PercentileRawTDigest
+            return new PercentileRawTDigestAggregationFunction(firstArgument, 
percentile);
+          }
           if (remainingFunctionName.equals("MV")) {
             // PercentileMV
             return new PercentileMVAggregationFunction(firstArgument, 
percentile);
@@ -102,10 +126,18 @@ public class AggregationFunctionFactory {
             // PercentileEstMV
             return new PercentileEstMVAggregationFunction(firstArgument, 
percentile);
           }
+          if (remainingFunctionName.equals("RAWESTMV")) {
+            // PercentileRawEstMV
+            return new PercentileRawEstMVAggregationFunction(firstArgument, 
percentile);
+          }
           if (remainingFunctionName.equals("TDIGESTMV")) {
             // PercentileTDigestMV
             return new PercentileTDigestMVAggregationFunction(firstArgument, 
percentile);
           }
+          if (remainingFunctionName.equals("RAWTDIGESTMV")) {
+            // PercentileRawTDigestMV
+            return new 
PercentileRawTDigestMVAggregationFunction(firstArgument, percentile);
+          }
         }
         throw new IllegalArgumentException("Invalid percentile function: " + 
function);
       } else {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java
new file mode 100644
index 0000000..dc5d435
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstAggregationFunction.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.QuantileDigest;
+import org.apache.pinot.segment.local.customobject.SerializedQuantileDigest;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code PercentileRawEstAggregationFunction} returns the serialized 
{@code QuantileDigest} data structure of the
+ * {@code PercentileEstAggregationFunction}.
+ */
+public class PercentileRawEstAggregationFunction
+    extends BaseSingleInputAggregationFunction<QuantileDigest, 
SerializedQuantileDigest> {
+  private final PercentileEstAggregationFunction 
_percentileEstAggregationFunction;
+
+  public PercentileRawEstAggregationFunction(ExpressionContext 
expressionContext, double percentile) {
+    this(expressionContext, new 
PercentileEstAggregationFunction(expressionContext, percentile));
+  }
+
+  public PercentileRawEstAggregationFunction(ExpressionContext 
expressionContext, int percentile) {
+    this(expressionContext, new 
PercentileEstAggregationFunction(expressionContext, percentile));
+  }
+
+  protected PercentileRawEstAggregationFunction(ExpressionContext expression,
+      PercentileEstAggregationFunction percentileEstAggregationFunction) {
+    super(expression);
+    _percentileEstAggregationFunction = percentileEstAggregationFunction;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.PERCENTILERAWEST;
+  }
+
+  @Override
+  public String getColumnName() {
+    final double percentile = _percentileEstAggregationFunction._percentile;
+    final int version = _percentileEstAggregationFunction._version;
+    final String type = getType().getName();
+
+    return version == 0 ? type + (int) percentile + "_" + _expression : type + 
percentile + "_" + _expression;
+  }
+
+  @Override
+  public String getResultColumnName() {
+    final double percentile = _percentileEstAggregationFunction._percentile;
+    final int version = _percentileEstAggregationFunction._version;
+    final String type = getType().getName().toLowerCase();
+
+    return version == 0 ? type + (int) percentile + "(" + _expression + ")"
+        : type + "(" + _expression + ", " + percentile + ")";
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return _percentileEstAggregationFunction.createAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return 
_percentileEstAggregationFunction.createGroupByResultHolder(initialCapacity, 
maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _percentileEstAggregationFunction.aggregate(length, 
aggregationResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _percentileEstAggregationFunction.aggregateGroupBySV(length, 
groupKeyArray, groupByResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _percentileEstAggregationFunction
+        .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public QuantileDigest extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return 
_percentileEstAggregationFunction.extractAggregationResult(aggregationResultHolder);
+  }
+
+  @Override
+  public QuantileDigest extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    return 
_percentileEstAggregationFunction.extractGroupByResult(groupByResultHolder, 
groupKey);
+  }
+
+  @Override
+  public QuantileDigest merge(QuantileDigest intermediateResult1, 
QuantileDigest intermediateResult2) {
+    return _percentileEstAggregationFunction.merge(intermediateResult1, 
intermediateResult2);
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return _percentileEstAggregationFunction.isIntermediateResultComparable();
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return _percentileEstAggregationFunction.getIntermediateResultColumnType();
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public SerializedQuantileDigest extractFinalResult(QuantileDigest 
intermediateResult) {
+    return new SerializedQuantileDigest(intermediateResult, 
_percentileEstAggregationFunction._percentile);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java
new file mode 100644
index 0000000..2f2bcfd
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawEstMVAggregationFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code PercentileRawEstMVAggregationFunction} returns the serialized 
{@code QuantileDigest} data structure of the
+ * {@code PercentileEstMVAggregationFunction}.
+ */
+public class PercentileRawEstMVAggregationFunction extends 
PercentileRawEstAggregationFunction {
+
+  public PercentileRawEstMVAggregationFunction(ExpressionContext 
expressionContext, int percentile) {
+    super(expressionContext, new 
PercentileEstMVAggregationFunction(expressionContext, percentile));
+  }
+
+  public PercentileRawEstMVAggregationFunction(ExpressionContext 
expressionContext, double percentile) {
+    super(expressionContext, new 
PercentileEstMVAggregationFunction(expressionContext, percentile));
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.PERCENTILERAWESTMV;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java
new file mode 100644
index 0000000..2724b21
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestAggregationFunction.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.tdunning.math.stats.TDigest;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.SerializedTDigest;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code PercentileRawTDigestAggregationFunction} returns the serialized 
{@code TDigest} data structure of the
+ * {@code PercentileEstAggregationFunction}.
+ */
+public class PercentileRawTDigestAggregationFunction
+    extends BaseSingleInputAggregationFunction<TDigest, SerializedTDigest> {
+  private final PercentileTDigestAggregationFunction 
_percentileTDigestAggregationFunction;
+
+  public PercentileRawTDigestAggregationFunction(ExpressionContext 
expressionContext, int percentile) {
+    this(expressionContext, new 
PercentileTDigestAggregationFunction(expressionContext, percentile));
+  }
+
+  public PercentileRawTDigestAggregationFunction(ExpressionContext 
expressionContext, double percentile) {
+    this(expressionContext, new 
PercentileTDigestAggregationFunction(expressionContext, percentile));
+  }
+
+  protected PercentileRawTDigestAggregationFunction(ExpressionContext 
expression,
+      PercentileTDigestAggregationFunction 
percentileTDigestAggregationFunction) {
+    super(expression);
+    _percentileTDigestAggregationFunction = 
percentileTDigestAggregationFunction;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.PERCENTILERAWTDIGEST;
+  }
+
+  @Override
+  public String getColumnName() {
+    final double percentile = 
_percentileTDigestAggregationFunction._percentile;
+    final int version = _percentileTDigestAggregationFunction._version;
+    final String type = getType().getName();
+
+    return version == 0 ? type + (int) percentile + "_" + _expression : type + 
percentile + "_" + _expression;
+  }
+
+  @Override
+  public String getResultColumnName() {
+    final double percentile = 
_percentileTDigestAggregationFunction._percentile;
+    final int version = _percentileTDigestAggregationFunction._version;
+    final String type = getType().getName().toLowerCase();
+
+    return version == 0 ? type + (int) percentile + "(" + _expression + ")"
+        : type + "(" + _expression + ", " + percentile + ")";
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return 
_percentileTDigestAggregationFunction.createAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return 
_percentileTDigestAggregationFunction.createGroupByResultHolder(initialCapacity,
 maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _percentileTDigestAggregationFunction.aggregate(length, 
aggregationResultHolder, blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _percentileTDigestAggregationFunction
+        .aggregateGroupBySV(length, groupKeyArray, groupByResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    _percentileTDigestAggregationFunction
+        .aggregateGroupByMV(length, groupKeysArray, groupByResultHolder, 
blockValSetMap);
+  }
+
+  @Override
+  public TDigest extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    return 
_percentileTDigestAggregationFunction.extractAggregationResult(aggregationResultHolder);
+  }
+
+  @Override
+  public TDigest extractGroupByResult(GroupByResultHolder groupByResultHolder, 
int groupKey) {
+    return 
_percentileTDigestAggregationFunction.extractGroupByResult(groupByResultHolder, 
groupKey);
+  }
+
+  @Override
+  public TDigest merge(TDigest intermediateResult1, TDigest 
intermediateResult2) {
+    return _percentileTDigestAggregationFunction.merge(intermediateResult1, 
intermediateResult2);
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return 
_percentileTDigestAggregationFunction.isIntermediateResultComparable();
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return 
_percentileTDigestAggregationFunction.getIntermediateResultColumnType();
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public SerializedTDigest extractFinalResult(TDigest intermediateResult) {
+    return new SerializedTDigest(intermediateResult, 
_percentileTDigestAggregationFunction._percentile);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java
new file mode 100644
index 0000000..4cf5cbb
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawTDigestMVAggregationFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+/**
+ * The {@code PercentileRawTDigestMVAggregationFunction} returns the 
serialized {@code TDigest} data structure of the
+ * {@code PercentileTDigestMVAggregationFunction}.
+ */
+public class PercentileRawTDigestMVAggregationFunction extends 
PercentileRawTDigestAggregationFunction {
+
+  public PercentileRawTDigestMVAggregationFunction(ExpressionContext 
expressionContext, int percentile) {
+    super(expressionContext, new 
PercentileTDigestMVAggregationFunction(expressionContext, percentile));
+  }
+
+  public PercentileRawTDigestMVAggregationFunction(ExpressionContext 
expressionContext, double percentile) {
+    super(expressionContext, new 
PercentileTDigestMVAggregationFunction(expressionContext, percentile));
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.PERCENTILERAWTDIGESTMV;
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index b855806..e5e0e22 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -136,6 +136,13 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getColumnName(), 
"percentileEst50_column");
     assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
 
+    function = getFunction("PeRcEnTiLeRaWEsT50");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
PercentileRawEstAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.PERCENTILERAWEST);
+    assertEquals(aggregationFunction.getColumnName(), 
"percentileRawEst50_column");
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
     function = getFunction("PeRcEnTiLeTdIgEsT99");
     aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
     assertTrue(aggregationFunction instanceof 
PercentileTDigestAggregationFunction);
@@ -143,6 +150,13 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getColumnName(), 
"percentileTDigest99_column");
     assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
 
+    function = getFunction("PeRcEnTiLeRaWTdIgEsT99");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
PercentileRawTDigestAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.PERCENTILERAWTDIGEST);
+    assertEquals(aggregationFunction.getColumnName(), 
"percentileRawTDigest99_column");
+    assertEquals(aggregationFunction.getResultColumnName(), 
function.toString());
+
     function = getFunction("PeRcEnTiLe", "(column, 5)");
     aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
     assertTrue(aggregationFunction instanceof PercentileAggregationFunction);
@@ -164,6 +178,13 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getColumnName(), 
"percentileEst50.0_column");
     assertEquals(aggregationFunction.getResultColumnName(), 
"percentileest(column, 50.0)");
 
+    function = getFunction("PeRcEnTiLeRaWeSt", "(column, 50)");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
PercentileRawEstAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.PERCENTILERAWEST);
+    assertEquals(aggregationFunction.getColumnName(), 
"percentileRawEst50.0_column");
+    assertEquals(aggregationFunction.getResultColumnName(), 
"percentilerawest(column, 50.0)");
+
     function = getFunction("PeRcEnTiLeEsT", "(column, 55.555)");
     aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
     assertTrue(aggregationFunction instanceof 
PercentileEstAggregationFunction);
@@ -171,6 +192,13 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getColumnName(), 
"percentileEst55.555_column");
     assertEquals(aggregationFunction.getResultColumnName(), 
"percentileest(column, 55.555)");
 
+    function = getFunction("PeRcEnTiLeRaWeSt", "(column, 55.555)");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
PercentileRawEstAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.PERCENTILERAWEST);
+    assertEquals(aggregationFunction.getColumnName(), 
"percentileRawEst55.555_column");
+    assertEquals(aggregationFunction.getResultColumnName(), 
"percentilerawest(column, 55.555)");
+
     function = getFunction("PeRcEnTiLeTdIgEsT", "(column, 99)");
     aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
     assertTrue(aggregationFunction instanceof 
PercentileTDigestAggregationFunction);
@@ -185,6 +213,20 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getColumnName(), 
"percentileTDigest99.9999_column");
     assertEquals(aggregationFunction.getResultColumnName(), 
"percentiletdigest(column, 99.9999)");
 
+    function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99)");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
PercentileRawTDigestAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.PERCENTILERAWTDIGEST);
+    assertEquals(aggregationFunction.getColumnName(), 
"percentileRawTDigest99.0_column");
+    assertEquals(aggregationFunction.getResultColumnName(), 
"percentilerawtdigest(column, 99.0)");
+
+    function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99.9999)");
+    aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof 
PercentileRawTDigestAggregationFunction);
+    assertEquals(aggregationFunction.getType(), 
AggregationFunctionType.PERCENTILERAWTDIGEST);
+    assertEquals(aggregationFunction.getColumnName(), 
"percentileRawTDigest99.9999_column");
+    assertEquals(aggregationFunction.getResultColumnName(), 
"percentilerawtdigest(column, 99.9999)");
+
     function = getFunction("CoUnTmV");
     aggregationFunction = 
AggregationFunctionFactory.getAggregationFunction(function, 
DUMMY_QUERY_CONTEXT);
     assertTrue(aggregationFunction instanceof CountMVAggregationFunction);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java
new file mode 100644
index 0000000..520f2ee
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExpectedQueryResult.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+public class ExpectedQueryResult<T> {
+  private long _numDocsScanned;
+  private long _numEntriesScannedInFilter;
+  private long _numEntriesScannedPostFilter;
+  private long _numTotalDocs;
+  private T[] _results;
+
+  public ExpectedQueryResult(long numDocsScanned, long 
numEntriesScannedInFilter, long numEntriesScannedPostFilter,
+      long numTotalDocs, T[] results) {
+    _numDocsScanned = numDocsScanned;
+    _numEntriesScannedInFilter = numEntriesScannedInFilter;
+    _numEntriesScannedPostFilter = numEntriesScannedPostFilter;
+    _numTotalDocs = numTotalDocs;
+    _results = results;
+  }
+
+  public long getNumDocsScanned() {
+    return _numDocsScanned;
+  }
+
+  public long getNumEntriesScannedInFilter() {
+    return _numEntriesScannedInFilter;
+  }
+
+  public long getNumEntriesScannedPostFilter() {
+    return _numEntriesScannedPostFilter;
+  }
+
+  public long getNumTotalDocs() {
+    return _numTotalDocs;
+  }
+
+  public T[] getResults() {
+    return _results;
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
index 4856eab..6c2b730 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueQueriesTest.java
@@ -42,6 +42,9 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
   private static final String MV_GROUP_BY = " group by column7";
   private static final String ORDER_BY_ALIAS = " order by cnt_column6 DESC";
 
+  // Allow 5% quantile error due to the randomness of TDigest merge
+  private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * 
Integer.MAX_VALUE;
+
   @Test
   public void testCountMV() {
     String query = "SELECT COUNTMV(column6) FROM testTable";
@@ -525,6 +528,116 @@ public class InterSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
   }
 
   @Test
+  public void testPercentileRawEst50MV() {
+    testPercentileRawEstAggregationFunction(50);
+  }
+
+  @Test
+  public void testPercentileRawEst90MV() {
+    testPercentileRawEstAggregationFunction(90);
+  }
+
+  @Test
+  public void testPercentileRawEst95MV() {
+    testPercentileRawEstAggregationFunction(95);
+  }
+
+  @Test
+  public void testPercentileRawEst99MV() {
+    testPercentileRawEstAggregationFunction(99);
+  }
+
+  private void testPercentileRawEstAggregationFunction(int percentile) {
+    Function<Serializable, String> quantileExtractor = value -> String.valueOf(
+        
ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) 
value))
+            .getQuantile(percentile / 100.0));
+
+    String rawQuery =
+        String.format("SELECT PERCENTILERAWEST%dMV(column6) FROM testTable", 
percentile);
+
+    String query =
+        String.format("SELECT PERCENTILEEST%dMV(column6) FROM testTable", 
percentile);
+
+    queryAndTestAggregationResult(rawQuery, getExpectedQueryResults(query), 
quantileExtractor);
+
+    queryAndTestAggregationResult(rawQuery + getFilter(), 
getExpectedQueryResults(query + getFilter()),
+        quantileExtractor);
+
+    queryAndTestAggregationResult(rawQuery + SV_GROUP_BY, 
getExpectedQueryResults(query + SV_GROUP_BY),
+        quantileExtractor);
+
+    queryAndTestAggregationResult(rawQuery + MV_GROUP_BY, 
getExpectedQueryResults(query + MV_GROUP_BY),
+        quantileExtractor);
+  }
+
+  private void testPercentileRawTDigestAggregationFunction(int percentile) {
+    Function<Serializable, String> quantileExtractor = value -> String.valueOf(
+        
ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) value))
+            .quantile(percentile / 100.0));
+
+    String rawQuery =
+        String.format("SELECT PERCENTILERAWTDIGEST%dMV(column6) FROM 
testTable", percentile);
+
+    String query =
+        String.format("SELECT PERCENTILETDIGEST%dMV(column6) FROM testTable", 
percentile);
+
+    queryAndTestAggregationResultWithDelta(rawQuery, 
getExpectedQueryResults(query), quantileExtractor);
+
+    queryAndTestAggregationResultWithDelta(rawQuery + getFilter(), 
getExpectedQueryResults(query + getFilter()),
+        quantileExtractor);
+
+    queryAndTestAggregationResultWithDelta(rawQuery + SV_GROUP_BY, 
getExpectedQueryResults(query + SV_GROUP_BY),
+        quantileExtractor);
+
+    queryAndTestAggregationResultWithDelta(rawQuery + MV_GROUP_BY, 
getExpectedQueryResults(query + MV_GROUP_BY),
+        quantileExtractor);
+  }
+
+  private ExpectedQueryResult<String> getExpectedQueryResults(String query) {
+    return 
QueriesTestUtils.buildExpectedResponse(getBrokerResponseForPqlQuery(query));
+  }
+
+  private void queryAndTestAggregationResultWithDelta(String query, 
ExpectedQueryResult<String> expectedQueryResults,
+      Function<Serializable, String> responseMapper) {
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+
+    QueriesTestUtils
+        .testInterSegmentApproximateAggregationResult(brokerResponse, 
expectedQueryResults.getNumDocsScanned(),
+            expectedQueryResults.getNumEntriesScannedInFilter(), 
expectedQueryResults.getNumEntriesScannedPostFilter(),
+            expectedQueryResults.getNumTotalDocs(), responseMapper, 
expectedQueryResults.getResults(),
+            PERCENTILE_TDIGEST_DELTA);
+  }
+
+  private void queryAndTestAggregationResult(String query, 
ExpectedQueryResult<String> expectedQueryResults,
+      Function<Serializable, String> responseMapper) {
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    QueriesTestUtils
+        .testInterSegmentAggregationResult(brokerResponse, 
expectedQueryResults.getNumDocsScanned(),
+            expectedQueryResults.getNumEntriesScannedInFilter(), 
expectedQueryResults.getNumEntriesScannedPostFilter(),
+            expectedQueryResults.getNumTotalDocs(), responseMapper, 
expectedQueryResults.getResults());
+  }
+
+  @Test
+  public void testPercentileRawTDigest50MV() {
+    testPercentileRawTDigestAggregationFunction(50);
+  }
+
+  @Test
+  public void testPercentileRawTDigest90MV() {
+    testPercentileRawTDigestAggregationFunction(90);
+  }
+
+  @Test
+  public void testPercentileRawTDigest95MV() {
+    testPercentileRawTDigestAggregationFunction(95);
+  }
+
+  @Test
+  public void testPercentileRawTDigest99MV() {
+    testPercentileRawTDigestAggregationFunction(99);
+  }
+
+  @Test
   public void testNumGroupsLimit() {
     String query = "SELECT COUNT(*) FROM testTable GROUP BY column6";
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index 54654ed..b88d38a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -37,6 +37,9 @@ import static org.testng.Assert.assertTrue;
 public class InterSegmentAggregationSingleValueQueriesTest extends 
BaseSingleValueQueriesTest {
   private static final String GROUP_BY = " group by column9";
 
+  // Allow 5% quantile error due to the randomness of TDigest merge
+  private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * 
Integer.MAX_VALUE;
+
   @Test
   public void testCount() {
     String query = "SELECT COUNT(*) FROM testTable";
@@ -413,6 +416,121 @@ public class 
InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
   }
 
   @Test
+  public void testPercentileRawEst50() {
+    testPercentileRawEstAggregationFunction(50);
+  }
+
+  @Test
+  public void testPercentileRawEst90() {
+    testPercentileRawEstAggregationFunction(90);
+  }
+
+  @Test
+  public void testPercentileRawEst95() {
+    testPercentileRawEstAggregationFunction(95);
+  }
+
+  @Test
+  public void testPercentileRawEst99() {
+    testPercentileRawEstAggregationFunction(99);
+  }
+
+  private void queryAndTestAggregationResult(String query, 
ExpectedQueryResult<String> expectedQueryResults,
+      Function<Serializable, String> responseMapper) {
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    QueriesTestUtils
+        .testInterSegmentAggregationResult(brokerResponse, 
expectedQueryResults.getNumDocsScanned(),
+            expectedQueryResults.getNumEntriesScannedInFilter(), 
expectedQueryResults.getNumEntriesScannedPostFilter(),
+            expectedQueryResults.getNumTotalDocs(), responseMapper, 
expectedQueryResults.getResults());
+  }
+
+  private void testPercentileRawEstAggregationFunction(int percentile) {
+    Function<Serializable, String> quantileExtractor = value -> String.valueOf(
+        
ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) 
value))
+            .getQuantile(percentile / 100.0));
+
+    String rawQuery =
+        String.format("SELECT PERCENTILERAWEST%d(column1), 
PERCENTILERAWEST%d(column3) FROM testTable", percentile,
+            percentile);
+
+    String query =
+        String
+            .format("SELECT PERCENTILEEST%d(column1), PERCENTILEEST%d(column3) 
FROM testTable", percentile, percentile);
+
+    queryAndTestAggregationResult(rawQuery, getExpectedQueryResults(query), 
quantileExtractor);
+
+    queryAndTestAggregationResult(rawQuery + getFilter(), 
getExpectedQueryResults(query + getFilter()),
+        quantileExtractor);
+
+    queryAndTestAggregationResult(rawQuery + GROUP_BY, 
getExpectedQueryResults(query + GROUP_BY), quantileExtractor);
+
+    queryAndTestAggregationResult(rawQuery + getFilter() + GROUP_BY,
+        getExpectedQueryResults(query + getFilter() + GROUP_BY),
+        quantileExtractor);
+  }
+
+  @Test
+  public void testPercentileRawTDigest50() {
+    testPercentileRawTDigestAggregationFunction(50);
+  }
+
+  @Test
+  public void testPercentileRawTDigest90() {
+    testPercentileRawTDigestAggregationFunction(90);
+  }
+
+  @Test
+  public void testPercentileRawTDigest95() {
+    testPercentileRawTDigestAggregationFunction(95);
+  }
+
+  @Test
+  public void testPercentileRawTDigest99() {
+    testPercentileRawTDigestAggregationFunction(99);
+  }
+
+  private void testPercentileRawTDigestAggregationFunction(int percentile) {
+    Function<Serializable, String> quantileExtractor = value -> String.valueOf(
+        
ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(BytesUtils.toBytes((String) value))
+            .quantile(percentile / 100.0));
+
+    String rawQuery =
+        String.format("SELECT PERCENTILERAWTDIGEST%d(column1), 
PERCENTILERAWTDIGEST%d(column3) FROM testTable",
+            percentile, percentile);
+
+    String query =
+        String.format("SELECT PERCENTILETDIGEST%d(column1), 
PERCENTILETDIGEST%d(column3) FROM testTable", percentile,
+            percentile);
+
+    queryAndTestAggregationResultWithDelta(rawQuery, 
getExpectedQueryResults(query), quantileExtractor);
+
+    queryAndTestAggregationResultWithDelta(rawQuery + getFilter(), 
getExpectedQueryResults(query + getFilter()),
+        quantileExtractor);
+
+    queryAndTestAggregationResultWithDelta(rawQuery + GROUP_BY, 
getExpectedQueryResults(query + GROUP_BY),
+        quantileExtractor);
+
+    queryAndTestAggregationResultWithDelta(rawQuery + getFilter() + GROUP_BY,
+        getExpectedQueryResults(query + getFilter() + GROUP_BY),
+        quantileExtractor);
+  }
+
+  private ExpectedQueryResult<String> getExpectedQueryResults(String query) {
+    return 
QueriesTestUtils.buildExpectedResponse(getBrokerResponseForPqlQuery(query));
+  }
+
+  private void queryAndTestAggregationResultWithDelta(String query, 
ExpectedQueryResult<String> expectedQueryResults,
+      Function<Serializable, String> responseMapper) {
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+
+    QueriesTestUtils
+        .testInterSegmentApproximateAggregationResult(brokerResponse, 
expectedQueryResults.getNumDocsScanned(),
+            expectedQueryResults.getNumEntriesScannedInFilter(), 
expectedQueryResults.getNumEntriesScannedPostFilter(),
+            expectedQueryResults.getNumTotalDocs(), responseMapper, 
expectedQueryResults.getResults(),
+            PERCENTILE_TDIGEST_DELTA);
+  }
+
+  @Test
   public void testNumGroupsLimit() {
     String query = "SELECT COUNT(*) FROM testTable GROUP BY column1";
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
index 41a1129..2d1a5d1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.queries;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -95,11 +96,9 @@ public class QueriesTestUtils {
   public static void testInterSegmentAggregationResult(BrokerResponseNative 
brokerResponse, long expectedNumDocsScanned,
       long expectedNumEntriesScannedInFilter, long 
expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs,
       Function<Serializable, String> responseMapper, String[] 
expectedAggregationResults) {
-    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 
expectedNumDocsScanned);
-    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 
expectedNumEntriesScannedInFilter);
-    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 
expectedNumEntriesScannedPostFilter);
-    Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
-    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
+    List<AggregationResult> aggregationResults =
+        validateAggregationStats(brokerResponse, expectedNumDocsScanned, 
expectedNumEntriesScannedInFilter,
+            expectedNumEntriesScannedPostFilter, expectedNumTotalDocs);
     int length = expectedAggregationResults.length;
     Assert.assertEquals(aggregationResults.size(), length);
     for (int i = 0; i < length; i++) {
@@ -117,9 +116,82 @@ public class QueriesTestUtils {
     }
   }
 
-  public static void 
testInterSegmentAggregationGroupByResult(BrokerResponseNative brokerResponse,
+  /**
+   * Builds expected response object from the given broker response.
+   * @param brokerResponse
+   * @return
+   */
+  public static ExpectedQueryResult<String> 
buildExpectedResponse(BrokerResponseNative brokerResponse) {
+    Function<Serializable, String> responseMapper = Serializable::toString;
+
+    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
+    List<String> results = new ArrayList<>();
+    for (int i = 0; i < aggregationResults.size(); i++) {
+      AggregationResult aggregationResult = aggregationResults.get(i);
+      Serializable value = aggregationResult.getValue();
+      if (value != null) {
+        results.add(responseMapper.apply(value));
+        // Aggregation.
+      } else {
+        // Group-by.
+        
results.add(responseMapper.apply(aggregationResult.getGroupByResult().get(0).getValue()));
+      }
+    }
+    return new ExpectedQueryResult<>(brokerResponse.getNumDocsScanned(), 
brokerResponse.getNumEntriesScannedInFilter(),
+        brokerResponse.getNumEntriesScannedPostFilter(), 
brokerResponse.getTotalDocs(), results.toArray(new String[0]));
+  }
+
+  /**
+   * Verifies the given results of an approximate aggregation function.
+   * @param brokerResponse Broker response
+   * @param expectedNumDocsScanned Number of documents scanned.
+   * @param expectedNumEntriesScannedInFilter Number of entries scanned in 
filter
+   * @param expectedNumEntriesScannedPostFilter Number of entries scanned post 
filter.
+   * @param expectedNumTotalDocs Total documents.
+   * @param responseMapper Mapper to process response.
+   * @param expectedAggregationResults Expected aggregation results.
+   * @param resultComparisionDelta Validate results are within +/- delta range 
(0 - 100)%.
+   */
+  public static void 
testInterSegmentApproximateAggregationResult(BrokerResponseNative 
brokerResponse,
       long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, 
long expectedNumEntriesScannedPostFilter,
-      long expectedNumTotalDocs, List<String[]> expectedGroupKeys, 
List<String[]> expectedAggregationResults) {
+      long expectedNumTotalDocs, Function<Serializable, String> 
responseMapper, String[] expectedAggregationResults,
+      double resultComparisionDelta) {
+    List<AggregationResult> aggregationResults =
+        validateAggregationStats(brokerResponse, expectedNumDocsScanned, 
expectedNumEntriesScannedInFilter,
+            expectedNumEntriesScannedPostFilter, expectedNumTotalDocs);
+    int length = expectedAggregationResults.length;
+    Assert.assertEquals(aggregationResults.size(), length);
+
+    for (int i = 0; i < length; i++) {
+      AggregationResult aggregationResult = aggregationResults.get(i);
+      double expectedResult = 
Double.parseDouble(expectedAggregationResults[i]);
+      Serializable value = aggregationResult.getValue();
+      double actualResult = 0L;
+      if (value != null) {
+        // Aggregation.
+        actualResult = Double.parseDouble(responseMapper.apply(value));
+      } else {
+        // Group-by.
+        actualResult = 
Double.parseDouble(responseMapper.apply(aggregationResult.getGroupByResult().get(0).getValue()));
+      }
+      Assert.assertEquals(actualResult, expectedResult, 
resultComparisionDelta);
+    }
+  }
+
+  private static List<AggregationResult> 
validateAggregationStats(BrokerResponseNative brokerResponse,
+      long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, 
long expectedNumEntriesScannedPostFilter,
+      long expectedNumTotalDocs) {
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 
expectedNumDocsScanned);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 
expectedNumEntriesScannedInFilter);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 
expectedNumEntriesScannedPostFilter);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
+    return brokerResponse.getAggregationResults();
+  }
+
+  public static void 
testInterSegmentAggregationGroupByResult(BrokerResponseNative brokerResponse,
+      long expectedNumDocsScanned,
+      long expectedNumEntriesScannedInFilter, long 
expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs,
+      List<String[]> expectedGroupKeys, List<String[]> 
expectedAggregationResults) {
     testInterSegmentAggregationGroupByResult(brokerResponse, 
expectedNumDocsScanned, expectedNumEntriesScannedInFilter,
         expectedNumEntriesScannedPostFilter, expectedNumTotalDocs, 
Serializable::toString, expectedGroupKeys,
         expectedAggregationResults);
@@ -129,13 +201,9 @@ public class QueriesTestUtils {
       long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, 
long expectedNumEntriesScannedPostFilter,
       long expectedNumTotalDocs, Function<Serializable, String> 
responseMapper, List<String[]> expectedGroupKeys,
       List<String[]> expectedAggregationResults) {
-    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 
expectedNumDocsScanned);
-    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 
expectedNumEntriesScannedInFilter);
-    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 
expectedNumEntriesScannedPostFilter);
-    Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
-    // size of this array will be equal to number of aggregation functions 
since
-    // we return each aggregation function separately
-    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
+    List<AggregationResult> aggregationResults =
+        validateAggregationStats(brokerResponse, expectedNumDocsScanned, 
expectedNumEntriesScannedInFilter,
+            expectedNumEntriesScannedPostFilter, expectedNumTotalDocs);
     int numAggregationColumns = aggregationResults.size();
     Assert.assertEquals(numAggregationColumns, 
expectedAggregationResults.get(0).length);
     int numKeyColumns = expectedGroupKeys.get(0).length;
@@ -181,12 +249,9 @@ public class QueriesTestUtils {
   static void testInterSegmentGroupByOrderByResultPQL(BrokerResponseNative 
brokerResponse, long expectedNumDocsScanned,
       long expectedNumEntriesScannedInFilter, long 
expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs,
       List<String[]> expectedGroups, List<List<Serializable>> expectedValues, 
boolean preserveType) {
-    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 
expectedNumDocsScanned);
-    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 
expectedNumEntriesScannedInFilter);
-    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 
expectedNumEntriesScannedPostFilter);
-    Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
-
-    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
+    List<AggregationResult> aggregationResults =
+        validateAggregationStats(brokerResponse, expectedNumDocsScanned, 
expectedNumEntriesScannedInFilter,
+            expectedNumEntriesScannedPostFilter, expectedNumTotalDocs);
     if (aggregationResults == null) {
       Assert.assertEquals(expectedGroups.size(), 0);
       Assert.assertEquals(expectedValues.size(), 0);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index 86f85f4..092a285 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -58,8 +58,10 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTRAWHLL:
         return new DistinctCountHLLValueAggregator();
       case PERCENTILEEST:
+      case PERCENTILERAWEST:
         return new PercentileEstValueAggregator();
       case PERCENTILETDIGEST:
+      case PERCENTILERAWTDIGEST:
         return new PercentileTDigestValueAggregator();
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + 
aggregationType);
@@ -94,8 +96,10 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTRAWHLL:
         return DistinctCountHLLValueAggregator.AGGREGATED_VALUE_TYPE;
       case PERCENTILEEST:
+      case PERCENTILERAWEST:
         return PercentileEstValueAggregator.AGGREGATED_VALUE_TYPE;
       case PERCENTILETDIGEST:
+      case PERCENTILERAWTDIGEST:
         return PercentileTDigestValueAggregator.AGGREGATED_VALUE_TYPE;
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + 
aggregationType);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java
new file mode 100644
index 0000000..edde4be
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedQuantileDigest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+
+/**
+ * Serialized and comparable version of QuantileDigest. Compares 
QuantileDigest for a specific percentile value.
+ */
+public class SerializedQuantileDigest implements 
Comparable<SerializedQuantileDigest> {
+  private final double _percentile;
+  private final QuantileDigest _quantileDigest;
+
+  public SerializedQuantileDigest(QuantileDigest quantileDigest, double 
percentile) {
+    _quantileDigest = quantileDigest;
+    _percentile = percentile / 100.0;
+  }
+
+  @Override
+  public int compareTo(SerializedQuantileDigest other) {
+    checkArgument(other._percentile == _percentile, "Percentile number doesn't 
match!");
+    return Long.compare(_quantileDigest.getQuantile(_percentile),
+        other._quantileDigest.getQuantile(_percentile));
+  }
+
+  @Override
+  public String toString() {
+    return 
BytesUtils.toHexString(CustomSerDeUtils.QUANTILE_DIGEST_SER_DE.serialize(_quantileDigest));
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java
new file mode 100644
index 0000000..24b2281
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedTDigest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import com.tdunning.math.stats.TDigest;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Serialized and comparable version of TDigest. Compares TDigest for a 
specific percentile value.
+ */
+public class SerializedTDigest implements Comparable<SerializedTDigest> {
+  private final double _percentile;
+  private final TDigest _tDigest;
+
+  public SerializedTDigest(TDigest tDigest, double percentile) {
+    _tDigest = tDigest;
+    _percentile = percentile / 100.0;
+  }
+
+  @Override
+  public int compareTo(SerializedTDigest other) {
+    checkArgument(other._percentile == _percentile, "Percentile number doesn't 
match!");
+    return Double.compare(_tDigest.quantile(_percentile), 
other._tDigest.quantile(_percentile));
+  }
+
+  @Override
+  public String toString() {
+    return 
BytesUtils.toHexString(CustomSerDeUtils.TDIGEST_SER_DE.serialize(_tDigest));
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 9d10e8a..197239b 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -44,7 +44,9 @@ public enum AggregationFunctionType {
   DISTINCTCOUNTRAWTHETASKETCH("distinctCountRawThetaSketch"),
   PERCENTILE("percentile"),
   PERCENTILEEST("percentileEst"),
+  PERCENTILERAWEST("percentileRawEst"),
   PERCENTILETDIGEST("percentileTDigest"),
+  PERCENTILERAWTDIGEST("percentileRawTDigest"),
   IDSET("idSet"),
 
   // Geo aggregation functions
@@ -63,7 +65,9 @@ public enum AggregationFunctionType {
   DISTINCTCOUNTRAWHLLMV("distinctCountRawHLLMV"),
   PERCENTILEMV("percentileMV"),
   PERCENTILEESTMV("percentileEstMV"),
+  PERCENTILERAWESTMV("percentileRawEstMV"),
   PERCENTILETDIGESTMV("percentileTDigestMV"),
+  PERCENTILERAWTDIGESTMV("percentileRawTDigestMV"),
   DISTINCT("distinct");
 
   private final String _name;
@@ -88,14 +92,22 @@ public enum AggregationFunctionType {
         return PERCENTILE;
       } else if (remainingFunctionName.equals("EST") || 
remainingFunctionName.matches("EST\\d+")) {
         return PERCENTILEEST;
+      } else if (remainingFunctionName.equals("RAWEST") || 
remainingFunctionName.matches("RAWEST\\d+")) {
+        return PERCENTILERAWEST;
       } else if (remainingFunctionName.equals("TDIGEST") || 
remainingFunctionName.matches("TDIGEST\\d+")) {
         return PERCENTILETDIGEST;
+      } else if (remainingFunctionName.equals("RAWTDIGEST") || 
remainingFunctionName.matches("RAWTDIGEST\\d+")) {
+        return PERCENTILERAWTDIGEST;
       } else if (remainingFunctionName.equals("MV") || 
remainingFunctionName.matches("\\d+MV")) {
         return PERCENTILEMV;
       } else if (remainingFunctionName.equals("ESTMV") || 
remainingFunctionName.matches("EST\\d+MV")) {
         return PERCENTILEESTMV;
+      } else if (remainingFunctionName.equals("RAWESTMV") || 
remainingFunctionName.matches("RAWEST\\d+MV")) {
+        return PERCENTILERAWESTMV;
       } else if (remainingFunctionName.equals("TDIGESTMV") || 
remainingFunctionName.matches("TDIGEST\\d+MV")) {
         return PERCENTILETDIGESTMV;
+      } else if (remainingFunctionName.equals("RAWTDIGESTMV") || 
remainingFunctionName.matches("RAWTDIGEST\\d+MV")) {
+        return PERCENTILERAWTDIGESTMV;
       } else {
         throw new IllegalArgumentException("Invalid aggregation function name: 
" + functionName);
       }
diff --git 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java
 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java
index 659fc4c..b09499a 100644
--- 
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java
+++ 
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/misc/AggregationFunctionColumnPairTest.java
@@ -92,6 +92,15 @@ public class AggregationFunctionColumnPairTest {
     Assert.assertEquals(fromColumnName, pair);
     Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode());
 
+    pair = new 
AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILERAWEST, COLUMN);
+    Assert.assertEquals(pair.getFunctionType(), 
AggregationFunctionType.PERCENTILERAWEST);
+    Assert.assertEquals(pair.getColumn(), COLUMN);
+    columnName = pair.toColumnName();
+    Assert.assertEquals(columnName, "percentileRawEst__column");
+    fromColumnName = 
AggregationFunctionColumnPair.fromColumnName("PERCENTILE_RAW_EST__column");
+    Assert.assertEquals(fromColumnName, pair);
+    Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode());
+
     pair = new 
AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILETDIGEST, 
COLUMN);
     Assert.assertEquals(pair.getFunctionType(), 
AggregationFunctionType.PERCENTILETDIGEST);
     Assert.assertEquals(pair.getColumn(), COLUMN);
@@ -100,5 +109,14 @@ public class AggregationFunctionColumnPairTest {
     fromColumnName = 
AggregationFunctionColumnPair.fromColumnName("percentiletdigest__column");
     Assert.assertEquals(fromColumnName, pair);
     Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode());
+
+    pair = new 
AggregationFunctionColumnPair(AggregationFunctionType.PERCENTILERAWTDIGEST, 
COLUMN);
+    Assert.assertEquals(pair.getFunctionType(), 
AggregationFunctionType.PERCENTILERAWTDIGEST);
+    Assert.assertEquals(pair.getColumn(), COLUMN);
+    columnName = pair.toColumnName();
+    Assert.assertEquals(columnName, "percentileRawTDigest__column");
+    fromColumnName = 
AggregationFunctionColumnPair.fromColumnName("percentilerawtdigest__column");
+    Assert.assertEquals(fromColumnName, pair);
+    Assert.assertEquals(fromColumnName.hashCode(), pair.hashCode());
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to