This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1f69041 [Part 2] Add geo support - add a geo aggregate function st_union (#5744) 1f69041 is described below commit 1f6904152bc80569ab6e4e90251ade344d72c14f Author: Yupeng Fu <yupe...@users.noreply.github.com> AuthorDate: Wed Jul 29 17:04:26 2020 -0700 [Part 2] Add geo support - add a geo aggregate function st_union (#5744) Added a new aggregate function that unions a set of geometry objects and returns a multi-geometry object. --- .../common/function/AggregationFunctionType.java | 3 + .../apache/pinot/core/common/ObjectSerDeUtils.java | 29 ++- .../pinot/core/geospatial/GeometryUtils.java | 2 + .../function/AggregationFunctionFactory.java | 2 + .../function/AggregationFunctionVisitorBase.java | 4 + .../function/StUnionAggregationFunction.java | 140 +++++++++++ .../apache/pinot/queries/StUnionQueriesTest.java | 258 +++++++++++++++++++++ 7 files changed, 436 insertions(+), 2 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java index ff3fb50..5125a8d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java @@ -36,6 +36,9 @@ public enum AggregationFunctionType { PERCENTILEEST("percentileEst"), PERCENTILETDIGEST("percentileTDigest"), + // geo aggregation functions + ST_UNION("ST_Union"), + // Aggregation functions for multi-valued columns COUNTMV("countMV"), MINMV("minMV"), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index f471e37..637d5e1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -36,10 +36,12 @@ import java.util.Map; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.theta.Sketch; import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair; import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable; import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair; import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest; +import org.locationtech.jts.geom.Geometry; /** @@ -63,7 +65,8 @@ public class ObjectSerDeUtils { IntSet(9), TDigest(10), DistinctTable(11), - DataSketch(12); + DataSketch(12), + Geometry(13); private int _value; @@ -102,6 +105,8 @@ public class ObjectSerDeUtils { return ObjectType.DistinctTable; } else if (value instanceof Sketch) { return ObjectType.DataSketch; + } else if (value instanceof Geometry) { + return ObjectType.Geometry; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -482,6 +487,25 @@ public class ObjectSerDeUtils { } }; + public static final ObjectSerDe<Geometry> GEOMETRY_SER_DE = new ObjectSerDe<Geometry>() { + @Override + public byte[] serialize(Geometry value) { + return GeometrySerializer.serialize(value); + } + + @Override + public Geometry deserialize(byte[] bytes) { + return GeometrySerializer.deserialize(bytes); + } + + @Override + public Geometry deserialize(ByteBuffer byteBuffer) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return GeometrySerializer.deserialize(bytes); + } + }; + // NOTE: DO NOT change the order, it has to be the same order as the ObjectType //@formatter:off private static final ObjectSerDe[] SER_DES = { @@ -497,7 +521,8 @@ public class ObjectSerDeUtils { INT_SET_SER_DE, TDIGEST_SER_DE, DISTINCT_TABLE_SER_DE, - DATA_SKETCH_SER_DE + DATA_SKETCH_SER_DE, + GEOMETRY_SER_DE }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java index 0d0cc97..309d4dd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.geospatial; import com.google.common.base.Joiner; import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Point; import org.locationtech.jts.geom.PrecisionModel; @@ -39,6 +40,7 @@ public class GeometryUtils { public static final double EARTH_RADIUS_KM = 6371.01; public static final double EARTH_RADIUS_M = EARTH_RADIUS_KM * 1000.0; public static final Joiner OR_JOINER = Joiner.on(" or "); + public static final Geometry EMPTY_POINT = GEOMETRY_FACTORY.createPoint(); private GeometryUtils() { } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 021c188..bfe8550 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -152,6 +152,8 @@ public class AggregationFunctionFactory { case DISTINCT: return new DistinctAggregationFunction(arguments, queryContext.getOrderByExpressions(), queryContext.getLimit()); + case ST_UNION: + return new StUnionAggregationFunction(firstArgument); default: throw new IllegalArgumentException(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java index 2b5b615..8e0a8a6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java @@ -98,5 +98,9 @@ public class AggregationFunctionVisitorBase { public void visit(DistinctCountThetaSketchAggregationFunction function) { } + + public void visit(StUnionAggregationFunction function) { + + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java new file mode 100644 index 0000000..8ae1d97 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.Map; +import org.apache.pinot.common.function.AggregationFunctionType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.geospatial.GeometryUtils; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.spi.utils.ByteArray; +import org.locationtech.jts.geom.Geometry; + + +public class StUnionAggregationFunction extends BaseSingleInputAggregationFunction<Geometry, ByteArray> { + + /** + * Constructor for the class. + * + * @param expression Expression to aggregate on. + */ + public StUnionAggregationFunction(ExpressionContext expression) { + super(expression); + } + + @Override + public AggregationFunctionType getType() { + return AggregationFunctionType.ST_UNION; + } + + @Override + public void accept(AggregationFunctionVisitorBase visitor) { + visitor.visit(this); + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV(); + Geometry geometry = aggregationResultHolder.getResult(); + for (int i = 0; i < length; i++) { + Geometry value = GeometrySerializer.deserialize(bytesArray[i]); + geometry = geometry == null ? value : geometry.union(value); + } + aggregationResultHolder.setValue(geometry); + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV(); + for (int i = 0; i < length; i++) { + int groupKey = groupKeyArray[i]; + Geometry value = GeometrySerializer.deserialize(bytesArray[i]); + Geometry geometry = groupByResultHolder.getResult(groupKey); + groupByResultHolder.setValueForKey(groupKey, geometry == null ? value : geometry.union(value)); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV(); + for (int i = 0; i < length; i++) { + Geometry value = GeometrySerializer.deserialize(bytesArray[i]); + for (int groupKey : groupKeysArray[i]) { + Geometry geometry = groupByResultHolder.getResult(groupKey); + groupByResultHolder.setValueForKey(groupKey, geometry == null ? value : geometry.union(value)); + } + } + } + + @Override + public Geometry extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + Geometry geometry = aggregationResultHolder.getResult(); + return geometry == null ? GeometryUtils.EMPTY_POINT : geometry; + } + + @Override + public Geometry extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + Geometry geometry = groupByResultHolder.getResult(groupKey); + return geometry == null ? GeometryUtils.EMPTY_POINT : geometry; + } + + @Override + public Geometry merge(Geometry intermediateResult1, Geometry intermediateResult2) { + return intermediateResult1.union(intermediateResult2); + } + + @Override + public boolean isIntermediateResultComparable() { + return false; + } + + @Override + public DataSchema.ColumnDataType getIntermediateResultColumnType() { + return DataSchema.ColumnDataType.OBJECT; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.BYTES; + } + + @Override + public ByteArray extractFinalResult(Geometry geometry) { + return new ByteArray(GeometrySerializer.serialize(geometry)); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java new file mode 100644 index 0000000..90b60fb --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.response.broker.AggregationResult; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.GroupByResult; +import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.readers.GenericRowRecordReader; +import org.apache.pinot.core.geospatial.GeometryUtils; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.query.AggregationGroupByOperator; +import org.apache.pinot.core.operator.query.AggregationOperator; +import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.ByteArray; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.Point; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertNotNull; + + +/** + * Queries test for ST_UNION queries. + */ +public class StUnionQueriesTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "StUnionQueriesTest"); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String SEGMENT_NAME = "testSegment"; + private static final Random RANDOM = new Random(); + + private static final int NUM_RECORDS = 200; + private static final int MAX_VALUE = 100000; + + private static final String POINT_COLUMN = "pointColumn"; + private static final String INT_COLUMN = "intColumn"; + private static final Schema SCHEMA = + new Schema.SchemaBuilder().addSingleValueDimension(POINT_COLUMN, FieldSpec.DataType.BYTES) + .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT).build(); + private static final TableConfig TABLE_CONFIG = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + + private Map<Integer, Geometry> _values; + private Geometry _intermediateResult; + private byte[] _expectedResults; + private IndexSegment _indexSegment; + private List<IndexSegment> _indexSegments; + + @Override + protected String getFilter() { + return ""; + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return _indexSegments; + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(INDEX_DIR); + int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE); + _values = new HashMap<>(hashMapCapacity); + List<GenericRow> records = new ArrayList<>(NUM_RECORDS); + for (int i = 0; i < NUM_RECORDS; i++) { + GenericRow record = new GenericRow(); + + int x = RANDOM.nextInt(MAX_VALUE); + int y = RANDOM.nextInt(MAX_VALUE); + Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(x, y)); + byte[] pointBytes = GeometrySerializer.serialize(point); + _intermediateResult = _intermediateResult == null ? point : point.union(_intermediateResult); + record.putValue(POINT_COLUMN, pointBytes); + + int value = RANDOM.nextInt(MAX_VALUE); + record.putValue(INT_COLUMN, value); + int key = Integer.hashCode(value); + _values.put(key, _values.containsKey(key) ? _values.get(key).union(point) : point); + records.add(record); + } + _expectedResults = GeometrySerializer.serialize(_intermediateResult); + + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); + segmentGeneratorConfig.setSegmentName(SEGMENT_NAME); + segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records)); + driver.build(); + + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + } + + @Test + public void testAggregationOnly() { + String query = "SELECT ST_UNION(pointColumn) FROM testTable"; + + // Inner segment + Operator operator = getOperatorForPqlQuery(query); + assertTrue(operator instanceof AggregationOperator); + IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, NUM_RECORDS, + NUM_RECORDS); + List<Object> aggregationResult = resultsBlock.getAggregationResult(); + + assertNotNull(aggregationResult); + + assertEquals(aggregationResult.get(0), _intermediateResult); + + // Inter segments + String[] expectedResults = new String[1]; + expectedResults[0] = new ByteArray(_expectedResults).toHexString(); + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + QueriesTestUtils + .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 4 * NUM_RECORDS, 4 * NUM_RECORDS, + expectedResults); + brokerResponse = getBrokerResponseForPqlQueryWithFilter(query); + QueriesTestUtils + .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 4 * NUM_RECORDS, 4 * NUM_RECORDS, + expectedResults); + } + + @Test + public void testAggregationOnlyOnEmptyResultSet() { + String query = "SELECT ST_UNION(pointColumn) FROM testTable where intColumn=-1"; + + // Inner segment + Operator operator = getOperatorForPqlQuery(query); + assertTrue(operator instanceof AggregationOperator); + IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), 0, 0, 0, NUM_RECORDS); + List<Object> aggregationResult = resultsBlock.getAggregationResult(); + + assertNotNull(aggregationResult); + + assertEquals(aggregationResult.get(0), GeometryUtils.EMPTY_POINT); + + // Inter segments + String[] expectedResults = new String[1]; + expectedResults[0] = new ByteArray(GeometrySerializer.serialize(GeometryUtils.EMPTY_POINT)).toHexString(); + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0, 0, 4 * NUM_RECORDS, expectedResults); + brokerResponse = getBrokerResponseForPqlQueryWithFilter(query); + QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0, 0, 4 * NUM_RECORDS, expectedResults); + } + + @Test + public void testAggregationGroupBy() { + String query = "SELECT ST_UNION(pointColumn) FROM testTable GROUP BY intColumn"; + + // Inner segment + Operator operator = getOperatorForPqlQuery(query); + assertTrue(operator instanceof AggregationGroupByOperator); + IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) operator).nextBlock(); + QueriesTestUtils + .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 2 * NUM_RECORDS, + NUM_RECORDS); + AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + assertNotNull(aggregationGroupByResult); + int numGroups = 0; + Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator(); + while (groupKeyIterator.hasNext()) { + numGroups++; + GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next(); + assertTrue(_values.containsKey(Integer.parseInt(groupKey._stringKey))); + } + assertEquals(numGroups, _values.size()); + + // Inter segments + BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query); + Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0); + Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 2 * NUM_RECORDS); + Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS); + // size of this array will be equal to number of aggregation functions since + // we return each aggregation function separately + List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults(); + int numAggregationColumns = aggregationResults.size(); + Assert.assertEquals(numAggregationColumns, 1); + for (AggregationResult aggregationResult : aggregationResults) { + Assert.assertNull(aggregationResult.getValue()); + List<GroupByResult> groupByResults = aggregationResult.getGroupByResult(); + numGroups = groupByResults.size(); + for (int i = 0; i < numGroups; i++) { + GroupByResult groupByResult = groupByResults.get(i); + List<String> group = groupByResult.getGroup(); + assertEquals(group.size(), 1); + int key = Integer.parseInt(group.get(0)); + assertTrue(_values.containsKey(key)); + assertEquals(groupByResult.getValue(), + new ByteArray(GeometrySerializer.serialize(_values.get(key))).toHexString()); + } + } + } + + @AfterClass + public void tearDown() + throws IOException { + FileUtils.deleteDirectory(INDEX_DIR); + _indexSegment.destroy(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org