This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new be35e0a Add HAVING support (#5889) be35e0a is described below commit be35e0aff6331d3f8403f69a942bd264a8df3b62 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Aug 20 12:58:49 2020 -0700 Add HAVING support (#5889) Add support for SQL HAVING clause Example of supported query: `SELECT DaysSinceEpoch, MAX(ArrDelay) - MAX(AirTime) AS Diff FROM mytable GROUP BY DaysSinceEpoch HAVING (Diff >= 300 AND Diff < 500) OR Diff < -500 ORDER BY Diff DESC` --- .../core/query/reduce/GroupByDataTableReducer.java | 124 ++++++++------ .../core/query/reduce/HavingFilterHandler.java | 182 +++++++++++++++++++++ .../org/apache/pinot/core/util/GroupByUtils.java | 6 +- .../core/query/reduce/HavingFilterHandlerTest.java | 103 ++++++++++++ .../tests/BaseClusterIntegrationTestSet.java | 10 ++ 5 files changed, 368 insertions(+), 57 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index ea8e9c7..60dac45 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.response.broker.AggregationResult; @@ -36,14 +35,15 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.common.utils.HashUtil; -import org.apache.pinot.core.data.table.ConcurrentIndexedTable; import org.apache.pinot.core.data.table.IndexedTable; import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SimpleIndexedTable; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.util.GroupByUtils; @@ -167,15 +167,6 @@ public class GroupByDataTableReducer implements DataTableReducer { DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema); int limit = _queryContext.getLimit(); List<Object[]> rows = new ArrayList<>(limit); - for (int i = 0; i < limit && sortedIterator.hasNext(); i++) { - Object[] row = sortedIterator.next().getValues(); - for (int j = 0; j < _numAggregationFunctions; j++) { - int valueIndex = j + _numGroupByExpressions; - row[valueIndex] = - AggregationFunctionUtils.getSerializableValue(_aggregationFunctions[j].extractFinalResult(row[valueIndex])); - } - rows.add(row); - } if (_sqlQuery) { // SQL query with SQL group-by mode and response format @@ -183,18 +174,51 @@ public class GroupByDataTableReducer implements DataTableReducer { PostAggregationHandler postAggregationHandler = new PostAggregationHandler(_queryContext, prePostAggregationDataSchema); DataSchema resultTableSchema = postAggregationHandler.getResultDataSchema(); + FilterContext havingFilter = _queryContext.getHavingFilter(); + if (havingFilter != null) { + HavingFilterHandler havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler); + while (rows.size() < limit && sortedIterator.hasNext()) { + Object[] row = sortedIterator.next().getValues(); + extractFinalAggregationResults(row); + if (havingFilterHandler.isMatch(row)) { + rows.add(row); + } + } + } else { + for (int i = 0; i < limit && sortedIterator.hasNext(); i++) { + Object[] row = sortedIterator.next().getValues(); + extractFinalAggregationResults(row); + rows.add(row); + } + } rows.replaceAll(postAggregationHandler::getResult); brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, rows)); } else { // PQL query with SQL group-by mode and response format // NOTE: For PQL query, keep the order of columns as is (group-by expressions followed by aggregations), no need - // to perform post-aggregation. + // to perform post-aggregation or filtering. + for (int i = 0; i < limit && sortedIterator.hasNext(); i++) { + Object[] row = sortedIterator.next().getValues(); + extractFinalAggregationResults(row); + rows.add(row); + } brokerResponseNative.setResultTable(new ResultTable(prePostAggregationDataSchema, rows)); } } /** + * Helper method to extract the final aggregation results for the given row (in-place). + */ + private void extractFinalAggregationResults(Object[] row) { + for (int i = 0; i < _numAggregationFunctions; i++) { + int valueIndex = i + _numGroupByExpressions; + row[valueIndex] = + AggregationFunctionUtils.getSerializableValue(_aggregationFunctions[i].extractFinalResult(row[valueIndex])); + } + } + + /** * Constructs the DataSchema for the rows before the post-aggregation (SQL mode). */ private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) { @@ -208,50 +232,42 @@ public class GroupByDataTableReducer implements DataTableReducer { } private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTables) { - int indexedTableCapacity = GroupByUtils.getTableCapacity(_queryContext); - IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, indexedTableCapacity); - + int capacity = GroupByUtils.getTableCapacity(_queryContext); + IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, capacity); + ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes(); for (DataTable dataTable : dataTables) { - BiFunction[] functions = new BiFunction[_numColumns]; - for (int i = 0; i < _numColumns; i++) { - ColumnDataType columnDataType = dataSchema.getColumnDataType(i); - BiFunction<Integer, Integer, Object> function; - switch (columnDataType) { - case INT: - function = dataTable::getInt; - break; - case LONG: - function = dataTable::getLong; - break; - case FLOAT: - function = dataTable::getFloat; - break; - case DOUBLE: - function = dataTable::getDouble; - break; - case STRING: - function = dataTable::getString; - break; - case BYTES: - function = dataTable::getBytes; - break; - case OBJECT: - function = dataTable::getObject; - break; - // Add other aggregation intermediate result / group-by column type supports here - default: - throw new IllegalStateException(); - } - functions[i] = function; - } - - for (int row = 0; row < dataTable.getNumberOfRows(); row++) { - Object[] columns = new Object[_numColumns]; - for (int col = 0; col < _numColumns; col++) { - columns[col] = functions[col].apply(row, col); + int numRows = dataTable.getNumberOfRows(); + for (int rowId = 0; rowId < numRows; rowId++) { + Object[] values = new Object[_numColumns]; + for (int colId = 0; colId < _numColumns; colId++) { + switch (columnDataTypes[colId]) { + case INT: + values[colId] = dataTable.getInt(rowId, colId); + break; + case LONG: + values[colId] = dataTable.getLong(rowId, colId); + break; + case FLOAT: + values[colId] = dataTable.getFloat(rowId, colId); + break; + case DOUBLE: + values[colId] = dataTable.getDouble(rowId, colId); + break; + case STRING: + values[colId] = dataTable.getString(rowId, colId); + break; + case BYTES: + values[colId] = dataTable.getBytes(rowId, colId); + break; + case OBJECT: + values[colId] = dataTable.getObject(rowId, colId); + break; + // Add other aggregation intermediate result / group-by column type supports here + default: + throw new IllegalStateException(); + } } - Record record = new Record(columns); - indexedTable.upsert(record); + indexedTable.upsert(new Record(values)); } } indexedTable.finish(true); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java new file mode 100644 index 0000000..08deed3 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import java.util.List; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; +import org.apache.pinot.core.query.request.context.FilterContext; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.utils.ByteArray; + + +/** + * Handler for HAVING clause. + */ +public class HavingFilterHandler { + private final PostAggregationHandler _postAggregationHandler; + private final RowMatcher _rowMatcher; + + public HavingFilterHandler(FilterContext havingFilter, PostAggregationHandler postAggregationHandler) { + _postAggregationHandler = postAggregationHandler; + _rowMatcher = getRowMatcher(havingFilter); + } + + /** + * Returns {@code true} if the given row matches the HAVING clause, {@code false} otherwise. + */ + public boolean isMatch(Object[] row) { + return _rowMatcher.isMatch(row); + } + + /** + * Helper method to construct a RowMatcher based on the given filter. + */ + private RowMatcher getRowMatcher(FilterContext filter) { + switch (filter.getType()) { + case AND: + return new AndRowMatcher(filter.getChildren()); + case OR: + return new OrRowMatcher(filter.getChildren()); + case PREDICATE: + return new PredicateRowMatcher(filter.getPredicate()); + default: + throw new IllegalStateException(); + } + } + + /** + * Filter matcher for the row. + */ + private interface RowMatcher { + + /** + * Returns {@code true} if the given row matches the filter, {@code false} otherwise. + */ + boolean isMatch(Object[] row); + } + + /** + * AND filter matcher. + */ + private class AndRowMatcher implements RowMatcher { + RowMatcher[] _childMatchers; + + AndRowMatcher(List<FilterContext> childFilters) { + int numChildren = childFilters.size(); + _childMatchers = new RowMatcher[numChildren]; + for (int i = 0; i < numChildren; i++) { + _childMatchers[i] = getRowMatcher(childFilters.get(i)); + } + } + + @Override + public boolean isMatch(Object[] row) { + for (RowMatcher childMatcher : _childMatchers) { + if (!childMatcher.isMatch(row)) { + return false; + } + } + return true; + } + } + + /** + * OR filter matcher. + */ + private class OrRowMatcher implements RowMatcher { + RowMatcher[] _childMatchers; + + OrRowMatcher(List<FilterContext> childFilters) { + int numChildren = childFilters.size(); + _childMatchers = new RowMatcher[numChildren]; + for (int i = 0; i < numChildren; i++) { + _childMatchers[i] = getRowMatcher(childFilters.get(i)); + } + } + + @Override + public boolean isMatch(Object[] row) { + for (RowMatcher childMatcher : _childMatchers) { + if (childMatcher.isMatch(row)) { + return true; + } + } + return false; + } + } + + /** + * Predicate matcher. + */ + private class PredicateRowMatcher implements RowMatcher { + PostAggregationHandler.ValueExtractor _valueExtractor; + DataType _valueType; + PredicateEvaluator _predicateEvaluator; + + PredicateRowMatcher(Predicate predicate) { + _valueExtractor = _postAggregationHandler.getValueExtractor(predicate.getLhs()); + switch (_valueExtractor.getColumnDataType()) { + case INT: + _valueType = DataType.INT; + break; + case LONG: + _valueType = DataType.LONG; + break; + case FLOAT: + _valueType = DataType.FLOAT; + break; + case DOUBLE: + _valueType = DataType.DOUBLE; + break; + case STRING: + _valueType = DataType.STRING; + break; + case BYTES: + _valueType = DataType.BYTES; + break; + default: + throw new IllegalStateException(); + } + _predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, null, _valueType); + } + + @Override + public boolean isMatch(Object[] row) { + Object value = _valueExtractor.extract(row); + switch (_valueType) { + case INT: + return _predicateEvaluator.applySV((int) value); + case LONG: + return _predicateEvaluator.applySV((long) value); + case FLOAT: + return _predicateEvaluator.applySV((float) value); + case DOUBLE: + return _predicateEvaluator.applySV((double) value); + case STRING: + return _predicateEvaluator.applySV((String) value); + case BYTES: + return _predicateEvaluator.applySV(((ByteArray) value).getBytes()); + default: + throw new IllegalStateException(); + } + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java index 7713137..dd29a29 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java @@ -39,13 +39,13 @@ public final class GroupByUtils { /** * (For SQL semantic) Returns the capacity of the table required by the given query. * <ul> - * <li>For group-by & order-by queries, returns {@code max(limit * 5, 5000)} to ensure the result accuracy</li> - * <li>For group-by without order-by queries, returns the limit</li> + * <li>For GROUP-BY with ORDER-BY or HAVING, returns {@code max(limit * 5, 5000)} to ensure the result accuracy</li> + * <li>For GROUP-BY without ORDER-BY or HAVING, returns the limit</li> * </ul> */ public static int getTableCapacity(QueryContext queryContext) { int limit = queryContext.getLimit(); - if (queryContext.getOrderByExpressions() != null) { + if (queryContext.getOrderByExpressions() != null || queryContext.getHavingFilter() != null) { return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT); } else { return limit; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java new file mode 100644 index 0000000..9870242 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.spi.utils.ByteArray; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class HavingFilterHandlerTest { + + @Test + public void testHavingFilter() { + // Simple having + { + QueryContext queryContext = QueryContextConverterUtils + .getQueryContextFromSQL("SELECT COUNT(*) FROM testTable GROUP BY d1 HAVING COUNT(*) > 5"); + DataSchema dataSchema = + new DataSchema(new String[]{"d1", "count(*)"}, new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG}); + PostAggregationHandler postAggregationHandler = new PostAggregationHandler(queryContext, dataSchema); + HavingFilterHandler havingFilterHandler = + new HavingFilterHandler(queryContext.getHavingFilter(), postAggregationHandler); + assertFalse(havingFilterHandler.isMatch(new Object[]{1, 5L})); + assertTrue(havingFilterHandler.isMatch(new Object[]{2, 10L})); + assertFalse(havingFilterHandler.isMatch(new Object[]{3, 3L})); + } + + // Nested having + { + QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL( + "SELECT MAX(m1), MIN(m1) FROM testTable GROUP BY d1 HAVING MAX(m1) IN (15, 20, 25) AND (MIN(m1) > 10 OR MIN(m1) <= 3)"); + DataSchema dataSchema = new DataSchema(new String[]{"d1", "max(m1)", "min(m1)"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}); + PostAggregationHandler postAggregationHandler = new PostAggregationHandler(queryContext, dataSchema); + HavingFilterHandler havingFilterHandler = + new HavingFilterHandler(queryContext.getHavingFilter(), postAggregationHandler); + assertFalse(havingFilterHandler.isMatch(new Object[]{1, 15.5, 13.0})); + assertTrue(havingFilterHandler.isMatch(new Object[]{2, 15.0, 3.0})); + assertFalse(havingFilterHandler.isMatch(new Object[]{3, 20.0, 7.5})); + } + + // Having with post-aggregation + { + QueryContext queryContext = QueryContextConverterUtils + .getQueryContextFromSQL("SELECT MAX(m1), MIN(m2) FROM testTable GROUP BY d1 HAVING MAX(m1) > MIN(m2) * 2"); + DataSchema dataSchema = new DataSchema(new String[]{"d1", "max(m1)", "min(m2)"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}); + PostAggregationHandler postAggregationHandler = new PostAggregationHandler(queryContext, dataSchema); + HavingFilterHandler havingFilterHandler = + new HavingFilterHandler(queryContext.getHavingFilter(), postAggregationHandler); + assertFalse(havingFilterHandler.isMatch(new Object[]{1, 15.5, 13.0})); + assertTrue(havingFilterHandler.isMatch(new Object[]{2, 15.0, 3.0})); + assertFalse(havingFilterHandler.isMatch(new Object[]{3, 20.0, 10.0})); + } + + // Having with all data types + { + QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL( + "SELECT COUNT(*) FROM testTable GROUP BY d1, d2, d3, d4, d5, d6 HAVING d1 > 10 AND d2 > 10 AND d3 > 10 AND d4 > 10 AND d5 > 10 AND d6 > 10"); + DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "d4", "d5", "d6", "count(*)"}, + new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.LONG}); + PostAggregationHandler postAggregationHandler = new PostAggregationHandler(queryContext, dataSchema); + HavingFilterHandler havingFilterHandler = + new HavingFilterHandler(queryContext.getHavingFilter(), postAggregationHandler); + assertTrue( + havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "11", new ByteArray(new byte[]{17}), 5})); + assertFalse( + havingFilterHandler.isMatch(new Object[]{10, 11L, 10.5f, 10.5, "11", new ByteArray(new byte[]{17}), 5})); + assertFalse( + havingFilterHandler.isMatch(new Object[]{11, 10L, 10.5f, 10.5, "11", new ByteArray(new byte[]{17}), 5})); + assertFalse( + havingFilterHandler.isMatch(new Object[]{11, 11L, 10.0f, 10.5, "11", new ByteArray(new byte[]{17}), 5})); + assertFalse( + havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.0, "11", new ByteArray(new byte[]{17}), 5})); + assertFalse( + havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "10", new ByteArray(new byte[]{17}), 5})); + assertFalse( + havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "11", new ByteArray(new byte[]{16}), 5})); + } + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index 53e85ce..cd18431 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -241,6 +241,16 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati query = "SELECT DaysSinceEpoch, MAX(ArrDelay) * 2 - MAX(AirTime) - 3 FROM mytable GROUP BY DaysSinceEpoch ORDER BY MAX(ArrDelay) - MIN(AirTime) DESC"; testSqlQuery(query, Collections.singletonList(query)); + + // Having + query = "SELECT COUNT(*) AS Count, DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch HAVING Count > 350"; + testSqlQuery(query, Collections.singletonList(query)); + query = + "SELECT MAX(ArrDelay) - MAX(AirTime) AS Diff, DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch HAVING Diff * 2 > 1000 ORDER BY Diff ASC"; + testSqlQuery(query, Collections.singletonList(query)); + query = + "SELECT DaysSinceEpoch, MAX(ArrDelay) - MAX(AirTime) AS Diff FROM mytable GROUP BY DaysSinceEpoch HAVING (Diff >= 300 AND Diff < 500) OR Diff < -500 ORDER BY Diff DESC"; + testSqlQuery(query, Collections.singletonList(query)); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org