This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d12ce887a9 Minmaxrange null (#12252)
d12ce887a9 is described below

commit d12ce887a9adf5e4d6253ada533e9f0a5df71298
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Tue Mar 19 02:06:54 2024 -0700

    Minmaxrange null (#12252)
    
    * new test framework candidate
    
    * Improved test system
    
    * Improve framework to be able to specify segments as strings
    
    * fix headers
    
    * Improve assertions when there are nulls
    
    * Improve error text
    
    * Improvements in the framework
    
    * Add a base class single input aggregation operations can extend to 
support null handling
    
    * Fix issue in NullableSingleInputAggregationFunction.forEachNotNullInt
    
    * Improve error message in NullEnabledQueriesTest
    
    * Add new schema family
    
    * Rename test schemas and table config
    
    * Split AllNullQueriesTest into on test per query
    
    * Revert change in AllNullQueriesTest that belongs to mode-null-support 
branch
    
    * Add null support in minmaxrange
    
    * Adapted to new framework
    
    * Applied suggestions during PR
---
 .../function/AggregationFunctionFactory.java       |   2 +-
 .../function/MinMaxRangeAggregationFunction.java   | 103 +++++------
 .../function/MinMaxRangeMVAggregationFunction.java |   2 +-
 .../MinMaxRangeAggregationFunctionTest.java        | 195 +++++++++++++++++++++
 .../aggregator/MinMaxRangeValueAggregator.java     |   2 +-
 .../local/customobject/MinMaxRangePair.java        |   8 +
 6 files changed, 259 insertions(+), 53 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 8db0d730d7..3c449f1578 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -316,7 +316,7 @@ public class AggregationFunctionFactory {
             }
           }
           case MINMAXRANGE:
-            return new MinMaxRangeAggregationFunction(arguments);
+            return new MinMaxRangeAggregationFunction(arguments, 
nullHandlingEnabled);
           case DISTINCTCOUNT:
             return new DistinctCountAggregationFunction(arguments, 
nullHandlingEnabled);
           case DISTINCTCOUNTBITMAP:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
index 1c039b9d14..28299429c6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
@@ -33,14 +33,14 @@ import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
-public class MinMaxRangeAggregationFunction extends 
BaseSingleInputAggregationFunction<MinMaxRangePair, Double> {
+public class MinMaxRangeAggregationFunction extends 
NullableSingleInputAggregationFunction<MinMaxRangePair, Double> {
 
-  public MinMaxRangeAggregationFunction(List<ExpressionContext> arguments) {
-    super(verifySingleArgument(arguments, "MIN_MAX_RANGE"));
+  public MinMaxRangeAggregationFunction(List<ExpressionContext> arguments, 
boolean nullHandlingEnabled) {
+    super(verifySingleArgument(arguments, "MIN_MAX_RANGE"), 
nullHandlingEnabled);
   }
 
-  protected MinMaxRangeAggregationFunction(ExpressionContext expression) {
-    super(expression);
+  protected MinMaxRangeAggregationFunction(ExpressionContext expression, 
boolean nullHandlingEnabled) {
+    super(expression, nullHandlingEnabled);
   }
 
   @Override
@@ -61,37 +61,29 @@ public class MinMaxRangeAggregationFunction extends 
BaseSingleInputAggregationFu
   @Override
   public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    double min = Double.POSITIVE_INFINITY;
-    double max = Double.NEGATIVE_INFINITY;
 
     BlockValSet blockValSet = blockValSetMap.get(_expression);
+    MinMaxRangePair minMax = new MinMaxRangePair();
+
     if (blockValSet.getValueType() != DataType.BYTES) {
       double[] doubleValues = blockValSet.getDoubleValuesSV();
-      for (int i = 0; i < length; i++) {
-        double value = doubleValues[i];
-        if (value < min) {
-          min = value;
-        }
-        if (value > max) {
-          max = value;
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          double value = doubleValues[i];
+          minMax.apply(value);
         }
-      }
+      });
     } else {
       // Serialized MinMaxRangePair
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
-      for (int i = 0; i < length; i++) {
-        MinMaxRangePair minMaxRangePair = 
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-        double minValue = minMaxRangePair.getMin();
-        double maxValue = minMaxRangePair.getMax();
-        if (minValue < min) {
-          min = minValue;
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          MinMaxRangePair minMaxRangePair = 
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+          minMax.apply(minMaxRangePair);
         }
-        if (maxValue > max) {
-          max = maxValue;
-        }
-      }
+      });
     }
-    setAggregationResult(aggregationResultHolder, min, max);
+    setAggregationResult(aggregationResultHolder, minMax.getMin(), 
minMax.getMax());
   }
 
   protected void setAggregationResult(AggregationResultHolder 
aggregationResultHolder, double min, double max) {
@@ -109,17 +101,21 @@ public class MinMaxRangeAggregationFunction extends 
BaseSingleInputAggregationFu
     BlockValSet blockValSet = blockValSetMap.get(_expression);
     if (blockValSet.getValueType() != DataType.BYTES) {
       double[] doubleValues = blockValSet.getDoubleValuesSV();
-      for (int i = 0; i < length; i++) {
-        double value = doubleValues[i];
-        setGroupByResult(groupKeyArray[i], groupByResultHolder, value, value);
-      }
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          double value = doubleValues[i];
+          setGroupByResult(groupKeyArray[i], groupByResultHolder, value, 
value);
+        }
+      });
     } else {
       // Serialized MinMaxRangePair
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
-      for (int i = 0; i < length; i++) {
-        MinMaxRangePair minMaxRangePair = 
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-        setGroupByResult(groupKeyArray[i], groupByResultHolder, 
minMaxRangePair.getMin(), minMaxRangePair.getMax());
-      }
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          MinMaxRangePair minMaxRangePair = 
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+          setGroupByResult(groupKeyArray[i], groupByResultHolder, 
minMaxRangePair.getMin(), minMaxRangePair.getMax());
+        }
+      });
     }
   }
 
@@ -129,23 +125,27 @@ public class MinMaxRangeAggregationFunction extends 
BaseSingleInputAggregationFu
     BlockValSet blockValSet = blockValSetMap.get(_expression);
     if (blockValSet.getValueType() != DataType.BYTES) {
       double[] doubleValues = blockValSet.getDoubleValuesSV();
-      for (int i = 0; i < length; i++) {
-        double value = doubleValues[i];
-        for (int groupKey : groupKeysArray[i]) {
-          setGroupByResult(groupKey, groupByResultHolder, value, value);
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          double value = doubleValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            setGroupByResult(groupKey, groupByResultHolder, value, value);
+          }
         }
-      }
+      });
     } else {
       // Serialized MinMaxRangePair
       byte[][] bytesValues = blockValSet.getBytesValuesSV();
-      for (int i = 0; i < length; i++) {
-        MinMaxRangePair minMaxRangePair = 
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-        double min = minMaxRangePair.getMin();
-        double max = minMaxRangePair.getMax();
-        for (int groupKey : groupKeysArray[i]) {
-          setGroupByResult(groupKey, groupByResultHolder, min, max);
+      forEachNotNull(length, blockValSet, (from, to) -> {
+        for (int i = from; i < to; i++) {
+          MinMaxRangePair minMaxRangePair = 
ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+          double min = minMaxRangePair.getMin();
+          double max = minMaxRangePair.getMax();
+          for (int groupKey : groupKeysArray[i]) {
+            setGroupByResult(groupKey, groupByResultHolder, min, max);
+          }
         }
-      }
+      });
     }
   }
 
@@ -161,8 +161,8 @@ public class MinMaxRangeAggregationFunction extends 
BaseSingleInputAggregationFu
   @Override
   public MinMaxRangePair extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
     MinMaxRangePair minMaxRangePair = aggregationResultHolder.getResult();
-    if (minMaxRangePair == null) {
-      return new MinMaxRangePair(Double.POSITIVE_INFINITY, 
Double.NEGATIVE_INFINITY);
+    if (minMaxRangePair == null && !_nullHandlingEnabled) {
+      return new MinMaxRangePair();
     } else {
       return minMaxRangePair;
     }
@@ -171,8 +171,8 @@ public class MinMaxRangeAggregationFunction extends 
BaseSingleInputAggregationFu
   @Override
   public MinMaxRangePair extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
     MinMaxRangePair minMaxRangePair = groupByResultHolder.getResult(groupKey);
-    if (minMaxRangePair == null) {
-      return new MinMaxRangePair(Double.POSITIVE_INFINITY, 
Double.NEGATIVE_INFINITY);
+    if (minMaxRangePair == null && !_nullHandlingEnabled) {
+      return new MinMaxRangePair();
     } else {
       return minMaxRangePair;
     }
@@ -196,6 +196,9 @@ public class MinMaxRangeAggregationFunction extends 
BaseSingleInputAggregationFu
 
   @Override
   public Double extractFinalResult(MinMaxRangePair intermediateResult) {
+    if (intermediateResult == null) {
+      return null;
+    }
     return intermediateResult.getMax() - intermediateResult.getMin();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
index 466a6b044f..534bdb41f2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
@@ -30,7 +30,7 @@ import org.apache.pinot.segment.spi.AggregationFunctionType;
 public class MinMaxRangeMVAggregationFunction extends 
MinMaxRangeAggregationFunction {
 
   public MinMaxRangeMVAggregationFunction(List<ExpressionContext> arguments) {
-    super(verifySingleArgument(arguments, "MIN_MAX_RANGE_MV"));
+    super(verifySingleArgument(arguments, "MIN_MAX_RANGE_MV"), false);
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java
new file mode 100644
index 0000000000..822399d66f
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunctionTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class MinMaxRangeAggregationFunctionTest extends 
AbstractAggregationFunctionTest {
+
+  @DataProvider(name = "scenarios")
+  Object[] scenarios() {
+    return new Object[] {
+        new Scenario(FieldSpec.DataType.INT),
+        new Scenario(FieldSpec.DataType.LONG),
+        new Scenario(FieldSpec.DataType.FLOAT),
+        new Scenario(FieldSpec.DataType.DOUBLE),
+    };
+  }
+
+  public class Scenario {
+    private final FieldSpec.DataType _dataType;
+
+    public Scenario(FieldSpec.DataType dataType) {
+      _dataType = dataType;
+    }
+
+    public FluentQueryTest.DeclaringTable getDeclaringTable(boolean 
nullHandlingEnabled) {
+      return givenSingleNullableFieldTable(_dataType, nullHandlingEnabled);
+    }
+
+    @Override
+    public String toString() {
+      return "Scenario{" + "dt=" + _dataType + '}';
+    }
+  }
+
+  String diffBetweenMinAnd9(FieldSpec.DataType dt) {
+    switch (dt) {
+      case INT: return "2.147483657E9";
+      case LONG: return "9.223372036854776E18";
+      case FLOAT: return "Infinity";
+      case DOUBLE: return "Infinity";
+      default: throw new IllegalArgumentException(dt.toString());
+    }
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrWithoutNull(Scenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "null",
+            "1",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null",
+            "9",
+            "null"
+        )
+        .whenQuery("select minmaxrange(myField) from testTable")
+        .thenResultIs("DOUBLE", diffBetweenMinAnd9(scenario._dataType));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrWithNull(Scenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "null",
+            "1",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null",
+            "9",
+            "null"
+        ).whenQuery("select minmaxrange(myField) from testTable")
+        .thenResultIs("DOUBLE", "8");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrSvWithoutNull(Scenario scenario) {
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "null",
+            "1",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null",
+            "9",
+            "null"
+        ).whenQuery("select 'cte', minmaxrange(myField) from testTable group 
by 'cte'")
+        .thenResultIs("STRING | DOUBLE", "cte | " + 
diffBetweenMinAnd9(scenario._dataType));
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrSvWithNull(Scenario scenario) {
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "null",
+            "1",
+            "null"
+        ).andOnSecondInstance("myField",
+            "null",
+            "9",
+            "null"
+        ).whenQuery("select 'cte', minmaxrange(myField) from testTable group 
by 'cte'")
+        .thenResultIs("STRING | DOUBLE", "cte | 8");
+  }
+
+  String aggrSvSelfWithoutNullResult(FieldSpec.DataType dt) {
+    switch (dt) {
+      case INT: return "0";
+      case LONG: return "0";
+      case FLOAT: return "NaN";
+      case DOUBLE: return "NaN";
+      default: throw new IllegalArgumentException(dt.toString());
+    }
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrSvSelfWithoutNull(Scenario scenario) {
+    PinotDataType pinotDataType = scenario._dataType == FieldSpec.DataType.INT
+        ? PinotDataType.INTEGER : 
PinotDataType.valueOf(scenario._dataType.name());
+
+    Object defaultNullValue;
+    switch (scenario._dataType) {
+      case INT:
+        defaultNullValue = Integer.MIN_VALUE;
+        break;
+      case LONG:
+        defaultNullValue = Long.MIN_VALUE;
+        break;
+      case FLOAT:
+        defaultNullValue = Float.NEGATIVE_INFINITY;
+        break;
+      case DOUBLE:
+        defaultNullValue = Double.NEGATIVE_INFINITY;
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected scenario data type " + 
scenario._dataType);
+    }
+
+    scenario.getDeclaringTable(false)
+        .onFirstInstance("myField",
+            "null",
+            "1",
+            "2"
+        ).andOnSecondInstance("myField",
+            "null",
+            "1",
+            "2"
+        ).whenQuery("select myField, minmaxrange(myField) from testTable group 
by myField order by myField")
+        .thenResultIs(pinotDataType + " | DOUBLE",
+            defaultNullValue + " | " + 
aggrSvSelfWithoutNullResult(scenario._dataType),
+            "1                   | 0",
+            "2                   | 0");
+  }
+
+  @Test(dataProvider = "scenarios")
+  void aggrSvSelfWithNull(Scenario scenario) {
+    PinotDataType pinotDataType = scenario._dataType == FieldSpec.DataType.INT
+        ? PinotDataType.INTEGER : 
PinotDataType.valueOf(scenario._dataType.name());
+
+    scenario.getDeclaringTable(true)
+        .onFirstInstance("myField",
+            "null",
+            "1",
+            "2"
+        ).andOnSecondInstance("myField",
+            "null",
+            "1",
+            "2"
+        ).whenQuery("select myField, minmaxrange(myField) from testTable group 
by myField order by myField")
+        .thenResultIs(pinotDataType + " | DOUBLE", "1 | 0", "2 | 0", "null | 
null");
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
index d7de9ef01f..484e22b4f9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/MinMaxRangeValueAggregator.java
@@ -53,7 +53,7 @@ public class MinMaxRangeValueAggregator implements 
ValueAggregator<Object, MinMa
       value.apply(deserializeAggregatedValue((byte[]) rawValue));
     } else {
       double doubleValue = ((Number) rawValue).doubleValue();
-      value.apply(doubleValue, doubleValue);
+      value.apply(doubleValue);
     }
     return value;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java
index 09e926378d..940e980821 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/MinMaxRangePair.java
@@ -26,11 +26,19 @@ public class MinMaxRangePair implements 
Comparable<MinMaxRangePair> {
   private double _min;
   private double _max;
 
+  public MinMaxRangePair() {
+    this(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+  }
+
   public MinMaxRangePair(double min, double max) {
     _min = min;
     _max = max;
   }
 
+  public void apply(double value) {
+    apply(value, value);
+  }
+
   public void apply(double min, double max) {
     if (min < _min) {
       _min = min;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to