This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f69041  [Part 2] Add geo support  - add a geo aggregate function 
st_union (#5744)
1f69041 is described below

commit 1f6904152bc80569ab6e4e90251ade344d72c14f
Author: Yupeng Fu <yupe...@users.noreply.github.com>
AuthorDate: Wed Jul 29 17:04:26 2020 -0700

    [Part 2] Add geo support  - add a geo aggregate function st_union (#5744)
    
    Added a new aggregate function that unions a set of geometry objects and 
returns a multi-geometry object.
---
 .../common/function/AggregationFunctionType.java   |   3 +
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  29 ++-
 .../pinot/core/geospatial/GeometryUtils.java       |   2 +
 .../function/AggregationFunctionFactory.java       |   2 +
 .../function/AggregationFunctionVisitorBase.java   |   4 +
 .../function/StUnionAggregationFunction.java       | 140 +++++++++++
 .../apache/pinot/queries/StUnionQueriesTest.java   | 258 +++++++++++++++++++++
 7 files changed, 436 insertions(+), 2 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index ff3fb50..5125a8d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -36,6 +36,9 @@ public enum AggregationFunctionType {
   PERCENTILEEST("percentileEst"),
   PERCENTILETDIGEST("percentileTDigest"),
 
+  // geo aggregation functions
+  ST_UNION("ST_Union"),
+
   // Aggregation functions for multi-valued columns
   COUNTMV("countMV"),
   MINMV("minMV"),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index f471e37..637d5e1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -36,10 +36,12 @@ import java.util.Map;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
 import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
 import 
org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import 
org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 import 
org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
+import org.locationtech.jts.geom.Geometry;
 
 
 /**
@@ -63,7 +65,8 @@ public class ObjectSerDeUtils {
     IntSet(9),
     TDigest(10),
     DistinctTable(11),
-    DataSketch(12);
+    DataSketch(12),
+    Geometry(13);
 
     private int _value;
 
@@ -102,6 +105,8 @@ public class ObjectSerDeUtils {
         return ObjectType.DistinctTable;
       } else if (value instanceof Sketch) {
         return ObjectType.DataSketch;
+      } else if (value instanceof Geometry) {
+        return ObjectType.Geometry;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + 
value.getClass().getSimpleName());
       }
@@ -482,6 +487,25 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<Geometry> GEOMETRY_SER_DE = new 
ObjectSerDe<Geometry>() {
+    @Override
+    public byte[] serialize(Geometry value) {
+      return GeometrySerializer.serialize(value);
+    }
+
+    @Override
+    public Geometry deserialize(byte[] bytes) {
+      return GeometrySerializer.deserialize(bytes);
+    }
+
+    @Override
+    public Geometry deserialize(ByteBuffer byteBuffer) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      return GeometrySerializer.deserialize(bytes);
+    }
+  };
+
   // NOTE: DO NOT change the order, it has to be the same order as the 
ObjectType
   //@formatter:off
   private static final ObjectSerDe[] SER_DES = {
@@ -497,7 +521,8 @@ public class ObjectSerDeUtils {
       INT_SET_SER_DE,
       TDIGEST_SER_DE,
       DISTINCT_TABLE_SER_DE,
-      DATA_SKETCH_SER_DE
+      DATA_SKETCH_SER_DE,
+      GEOMETRY_SER_DE
   };
   //@formatter:on
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java
index 0d0cc97..309d4dd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.geospatial;
 import com.google.common.base.Joiner;
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Point;
 import org.locationtech.jts.geom.PrecisionModel;
 
 
@@ -39,6 +40,7 @@ public class GeometryUtils {
   public static final double EARTH_RADIUS_KM = 6371.01;
   public static final double EARTH_RADIUS_M = EARTH_RADIUS_KM * 1000.0;
   public static final Joiner OR_JOINER = Joiner.on(" or ");
+  public static final Geometry EMPTY_POINT = GEOMETRY_FACTORY.createPoint();
 
   private GeometryUtils() {
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 021c188..bfe8550 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -152,6 +152,8 @@ public class AggregationFunctionFactory {
           case DISTINCT:
             return new DistinctAggregationFunction(arguments, 
queryContext.getOrderByExpressions(),
                 queryContext.getLimit());
+          case ST_UNION:
+            return new StUnionAggregationFunction(firstArgument);
           default:
             throw new IllegalArgumentException();
         }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
index 2b5b615..8e0a8a6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
@@ -98,5 +98,9 @@ public class AggregationFunctionVisitorBase {
 
   public void visit(DistinctCountThetaSketchAggregationFunction function) {
   }
+
+  public void visit(StUnionAggregationFunction function) {
+
+  }
 }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
new file mode 100644
index 0000000..8ae1d97
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.geospatial.GeometryUtils;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.locationtech.jts.geom.Geometry;
+
+
+public class StUnionAggregationFunction extends 
BaseSingleInputAggregationFunction<Geometry, ByteArray> {
+
+  /**
+   * Constructor for the class.
+   *
+   * @param expression Expression to aggregate on.
+   */
+  public StUnionAggregationFunction(ExpressionContext expression) {
+    super(expression);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.ST_UNION;
+  }
+
+  @Override
+  public void accept(AggregationFunctionVisitorBase visitor) {
+    visitor.visit(this);
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, 
int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder 
aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV();
+    Geometry geometry = aggregationResultHolder.getResult();
+    for (int i = 0; i < length; i++) {
+      Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
+      geometry = geometry == null ? value : geometry.union(value);
+    }
+    aggregationResultHolder.setValue(geometry);
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV();
+    for (int i = 0; i < length; i++) {
+      int groupKey = groupKeyArray[i];
+      Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
+      Geometry geometry = groupByResultHolder.getResult(groupKey);
+      groupByResultHolder.setValueForKey(groupKey, geometry == null ? value : 
geometry.union(value));
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, 
GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV();
+    for (int i = 0; i < length; i++) {
+      Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
+      for (int groupKey : groupKeysArray[i]) {
+        Geometry geometry = groupByResultHolder.getResult(groupKey);
+        groupByResultHolder.setValueForKey(groupKey, geometry == null ? value 
: geometry.union(value));
+      }
+    }
+  }
+
+  @Override
+  public Geometry extractAggregationResult(AggregationResultHolder 
aggregationResultHolder) {
+    Geometry geometry = aggregationResultHolder.getResult();
+    return geometry == null ? GeometryUtils.EMPTY_POINT : geometry;
+  }
+
+  @Override
+  public Geometry extractGroupByResult(GroupByResultHolder 
groupByResultHolder, int groupKey) {
+    Geometry geometry = groupByResultHolder.getResult(groupKey);
+    return geometry == null ? GeometryUtils.EMPTY_POINT : geometry;
+  }
+
+  @Override
+  public Geometry merge(Geometry intermediateResult1, Geometry 
intermediateResult2) {
+    return intermediateResult1.union(intermediateResult2);
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return false;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.BYTES;
+  }
+
+  @Override
+  public ByteArray extractFinalResult(Geometry geometry) {
+    return new ByteArray(GeometrySerializer.serialize(geometry));
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
new file mode 100644
index 0000000..90b60fb
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.geospatial.GeometryUtils;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.Point;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertNotNull;
+
+
+/**
+ * Queries test for ST_UNION queries.
+ */
+public class StUnionQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"StUnionQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final Random RANDOM = new Random();
+
+  private static final int NUM_RECORDS = 200;
+  private static final int MAX_VALUE = 100000;
+
+  private static final String POINT_COLUMN = "pointColumn";
+  private static final String INT_COLUMN = "intColumn";
+  private static final Schema SCHEMA =
+      new Schema.SchemaBuilder().addSingleValueDimension(POINT_COLUMN, 
FieldSpec.DataType.BYTES)
+          .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT).build();
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private Map<Integer, Geometry> _values;
+  private Geometry _intermediateResult;
+  private byte[] _expectedResults;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+    int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
+    _values = new HashMap<>(hashMapCapacity);
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+
+      int x = RANDOM.nextInt(MAX_VALUE);
+      int y = RANDOM.nextInt(MAX_VALUE);
+      Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new 
Coordinate(x, y));
+      byte[] pointBytes = GeometrySerializer.serialize(point);
+      _intermediateResult = _intermediateResult == null ? point : 
point.union(_intermediateResult);
+      record.putValue(POINT_COLUMN, pointBytes);
+
+      int value = RANDOM.nextInt(MAX_VALUE);
+      record.putValue(INT_COLUMN, value);
+      int key = Integer.hashCode(value);
+      _values.put(key, _values.containsKey(key) ? 
_values.get(key).union(point) : point);
+      records.add(record);
+    }
+    _expectedResults = GeometrySerializer.serialize(_intermediateResult);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @Test
+  public void testAggregationOnly() {
+    String query = "SELECT ST_UNION(pointColumn) FROM testTable";
+
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationOperator) 
operator).nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
 NUM_RECORDS, 0, NUM_RECORDS,
+        NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getAggregationResult();
+
+    assertNotNull(aggregationResult);
+
+    assertEquals(aggregationResult.get(0), _intermediateResult);
+
+    // Inter segments
+    String[] expectedResults = new String[1];
+    expectedResults[0] = new ByteArray(_expectedResults).toHexString();
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    QueriesTestUtils
+        .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 
4 * NUM_RECORDS, 4 * NUM_RECORDS,
+            expectedResults);
+    brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
+    QueriesTestUtils
+        .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 
4 * NUM_RECORDS, 4 * NUM_RECORDS,
+            expectedResults);
+  }
+
+  @Test
+  public void testAggregationOnlyOnEmptyResultSet() {
+    String query = "SELECT ST_UNION(pointColumn) FROM testTable where 
intColumn=-1";
+
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationOperator) 
operator).nextBlock();
+    
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
 0, 0, 0, NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getAggregationResult();
+
+    assertNotNull(aggregationResult);
+
+    assertEquals(aggregationResult.get(0), GeometryUtils.EMPTY_POINT);
+
+    // Inter segments
+    String[] expectedResults = new String[1];
+    expectedResults[0] = new 
ByteArray(GeometrySerializer.serialize(GeometryUtils.EMPTY_POINT)).toHexString();
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0, 
0, 4 * NUM_RECORDS, expectedResults);
+    brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
+    QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0, 
0, 4 * NUM_RECORDS, expectedResults);
+  }
+
+  @Test
+  public void testAggregationGroupBy() {
+    String query = "SELECT ST_UNION(pointColumn) FROM testTable GROUP BY 
intColumn";
+
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationGroupByOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) 
operator).nextBlock();
+    QueriesTestUtils
+        
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 
NUM_RECORDS, 0, 2 * NUM_RECORDS,
+            NUM_RECORDS);
+    AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+    int numGroups = 0;
+    Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = 
aggregationGroupByResult.getGroupKeyIterator();
+    while (groupKeyIterator.hasNext()) {
+      numGroups++;
+      GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+      assertTrue(_values.containsKey(Integer.parseInt(groupKey._stringKey)));
+    }
+    assertEquals(numGroups, _values.size());
+
+    // Inter segments
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 2 
* NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+    // size of this array will be equal to number of aggregation functions 
since
+    // we return each aggregation function separately
+    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
+    int numAggregationColumns = aggregationResults.size();
+    Assert.assertEquals(numAggregationColumns, 1);
+    for (AggregationResult aggregationResult : aggregationResults) {
+      Assert.assertNull(aggregationResult.getValue());
+      List<GroupByResult> groupByResults = 
aggregationResult.getGroupByResult();
+      numGroups = groupByResults.size();
+      for (int i = 0; i < numGroups; i++) {
+        GroupByResult groupByResult = groupByResults.get(i);
+        List<String> group = groupByResult.getGroup();
+        assertEquals(group.size(), 1);
+        int key = Integer.parseInt(group.get(0));
+        assertTrue(_values.containsKey(key));
+        assertEquals(groupByResult.getValue(),
+            new 
ByteArray(GeometrySerializer.serialize(_values.get(key))).toHexString());
+      }
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(INDEX_DIR);
+    _indexSegment.destroy();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to