This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5b9b836 Add Post-Aggregation Gapfilling functionality. (#7781) 5b9b836 is described below commit 5b9b8364ef546113ff548b41504cbb0dc2aa2eda Author: weixiangsun <91153405+weixiang...@users.noreply.github.com> AuthorDate: Thu Dec 2 17:00:51 2021 -0800 Add Post-Aggregation Gapfilling functionality. (#7781) --- .../core/operator/transform/TransformOperator.java | 4 +- .../reduce/GapFillGroupByDataTableReducer.java | 491 ++++++++++++++++ .../core/query/reduce/PostAggregationHandler.java | 4 +- .../core/query/reduce/ResultReducerFactory.java | 3 + .../org/apache/pinot/core/util/GapfillUtils.java | 122 ++++ .../queries/PostAggregationGapfillQueriesTest.java | 616 +++++++++++++++++++++ 6 files changed, 1238 insertions(+), 2 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java index d3b752e..08bfeb8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java @@ -36,6 +36,7 @@ import org.apache.pinot.core.operator.blocks.TransformBlock; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.GapfillUtils; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.reader.Dictionary; @@ -62,7 +63,8 @@ public class TransformOperator extends BaseOperator<TransformBlock> { _projectionOperator = projectionOperator; _dataSourceMap = projectionOperator.getDataSourceMap(); for (ExpressionContext expression : expressions) { - TransformFunction transformFunction = TransformFunctionFactory.get(queryContext, expression, _dataSourceMap); + TransformFunction transformFunction = + TransformFunctionFactory.get(queryContext, GapfillUtils.stripGapfill(expression), _dataSourceMap); _transformFunctionMap.put(expression, transformFunction); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java new file mode 100644 index 0000000..9f739b0 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java @@ -0,0 +1,491 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.reduce; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerGauge; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.QueryProcessingException; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.data.table.ConcurrentIndexedTable; +import org.apache.pinot.core.data.table.IndexedTable; +import org.apache.pinot.core.data.table.Key; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SimpleIndexedTable; +import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; +import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.core.util.GapfillUtils; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.core.util.trace.TraceCallable; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.DateTimeGranularitySpec; + + +/** + * Helper class to reduce data tables and set group by results into the BrokerResponseNative + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class GapFillGroupByDataTableReducer implements DataTableReducer { + private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value. + + private final QueryContext _queryContext; + private final AggregationFunction[] _aggregationFunctions; + private final int _numAggregationFunctions; + private final List<ExpressionContext> _groupByExpressions; + private final int _numGroupByExpressions; + private final int _numColumns; + private final DateTimeGranularitySpec _dateTimeGranularity; + private final DateTimeFormatSpec _dateTimeFormatter; + private final long _startMs; + private final long _endMs; + private final Set<Key> _groupByKeys; + private final Map<Key, Object[]> _previousByGroupKey; + private final int _numOfGroupByKeys; + private final List<Integer> _groupByKeyIndexes; + private final boolean [] _isGroupBySelections; + private int _timeBucketIndex = -1; + + GapFillGroupByDataTableReducer(QueryContext queryContext) { + Preconditions.checkArgument( + queryContext.getBrokerRequest().getPinotQuery() != null, "GapFill can only be applied to sql query"); + _queryContext = queryContext; + _aggregationFunctions = queryContext.getAggregationFunctions(); + assert _aggregationFunctions != null; + _numAggregationFunctions = _aggregationFunctions.length; + _groupByExpressions = queryContext.getGroupByExpressions(); + assert _groupByExpressions != null; + _numGroupByExpressions = _groupByExpressions.size(); + _numColumns = _numAggregationFunctions + _numGroupByExpressions; + + ExpressionContext gapFillSelection = null; + for (ExpressionContext expressionContext : _queryContext.getSelectExpressions()) { + if (GapfillUtils.isPostAggregateGapfill(expressionContext)) { + gapFillSelection = expressionContext; + break; + } + } + + List<ExpressionContext> args = gapFillSelection.getFunction().getArguments(); + Preconditions.checkArgument( + args.size() == 5, "PostAggregateGapFill does not have correct number of arguments."); + Preconditions.checkArgument( + args.get(1).getLiteral() != null, "The second argument of PostAggregateGapFill should be TimeFormatter."); + Preconditions.checkArgument( + args.get(2).getLiteral() != null, "The third argument of PostAggregateGapFill should be start time."); + Preconditions.checkArgument( + args.get(3).getLiteral() != null, "The fourth argument of PostAggregateGapFill should be end time."); + Preconditions.checkArgument( + args.get(4).getLiteral() != null, "The fifth argument of PostAggregateGapFill should be time bucket size."); + + boolean orderByTimeBucket = false; + if (_queryContext.getOrderByExpressions() != null && !_queryContext.getOrderByExpressions().isEmpty()) { + OrderByExpressionContext firstOrderByExpression = _queryContext.getOrderByExpressions().get(0); + orderByTimeBucket = + firstOrderByExpression.isAsc() && firstOrderByExpression.getExpression().equals(gapFillSelection); + } + + Preconditions.checkArgument( + orderByTimeBucket, "PostAggregateGapFill does not work if the time bucket is not ordered."); + + _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral()); + _dateTimeGranularity = new DateTimeGranularitySpec(args.get(4).getLiteral()); + String start = args.get(2).getLiteral(); + String end = args.get(3).getLiteral(); + _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start)); + _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end)); + _groupByKeys = new HashSet<>(); + _previousByGroupKey = new HashMap<>(); + _numOfGroupByKeys = _queryContext.getGroupByExpressions().size() - 1; + _groupByKeyIndexes = new ArrayList<>(); + _isGroupBySelections = new boolean[_queryContext.getSelectExpressions().size()]; + + for (ExpressionContext expressionContext : _groupByExpressions) { + if (GapfillUtils.isPostAggregateGapfill(expressionContext)) { + for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) { + if (expressionContext.equals(_queryContext.getSelectExpressions().get(i))) { + _timeBucketIndex = i; + _isGroupBySelections[i] = true; + break; + } + } + } else { + for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) { + if (expressionContext.equals(_queryContext.getSelectExpressions().get(i))) { + _groupByKeyIndexes.add(i); + _isGroupBySelections[i] = true; + break; + } + } + } + } + + Preconditions.checkArgument(_timeBucketIndex >= 0, "There is no time bucket."); + } + + private long truncate(long epoch) { + int sz = _dateTimeGranularity.getSize(); + return epoch / sz * sz; + } + + /** + * Reduces and sets group by results into ResultTable, if responseFormat = sql + * By default, sets group by results into GroupByResults + */ + @Override + public void reduceAndSetResults(String tableName, DataSchema dataSchema, + Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative, + DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) { + assert dataSchema != null; + Collection<DataTable> dataTables = dataTableMap.values(); + + try { + setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, reducerContext, tableName, + brokerMetrics); + } catch (TimeoutException e) { + brokerResponseNative.getProcessingExceptions() + .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage())); + } + int resultSize = brokerResponseNative.getResultTable().getRows().size(); + + if (brokerMetrics != null && resultSize > 0) { + brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE, resultSize); + } + } + + private Key constructGroupKeys(Object[] row) { + Object [] groupKeys = new Object[_numOfGroupByKeys]; + for (int i = 0; i < _numOfGroupByKeys; i++) { + groupKeys[i] = row[_groupByKeyIndexes.get(i)]; + } + return new Key(groupKeys); + } + + /** + * Extract group by order by results and set into {@link ResultTable} + * @param brokerResponseNative broker response + * @param dataSchema data schema + * @param dataTables Collection of data tables + * @param reducerContext DataTableReducer context + * @param rawTableName table name + * @param brokerMetrics broker metrics (meters) + * @throws TimeoutException If unable complete within timeout. + */ + private void setSQLGroupByInResultTable(BrokerResponseNative brokerResponseNative, DataSchema dataSchema, + Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName, + BrokerMetrics brokerMetrics) + throws TimeoutException { + IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext); + if (brokerMetrics != null) { + brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes()); + brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs()); + } + DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema); + ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes(); + + PostAggregationHandler postAggregationHandler = + new PostAggregationHandler(_queryContext, prePostAggregationDataSchema); + DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema(); + ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes(); + Iterator<Record> sortedIterator = indexedTable.iterator(); + while (sortedIterator.hasNext()) { + Object[] row = sortedIterator.next().getValues(); + extractFinalAggregationResults(row); + for (int i = 0; i < columnDataTypes.length; i++) { + row[i] = columnDataTypes[i].convert(row[i]); + } + Object[] resultRow = postAggregationHandler.getResult(row); + for (int i = 0; i < resultColumnDataTypes.length; i++) { + resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]); + } + + _groupByKeys.add(constructGroupKeys(resultRow)); + } + + List<Object[]> gapfillResultRows = gapFill(indexedTable.iterator(), postAggregationHandler); + brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, gapfillResultRows)); + } + + List<Object[]> gapFill(Iterator<Record> sortedIterator, PostAggregationHandler postAggregationHandler) { + DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema(); + ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes(); + int limit = _queryContext.getLimit(); + int numResultColumns = resultColumnDataTypes.length; + List<Object[]> gapfillResultRows = new ArrayList<>(limit); + long step = _dateTimeGranularity.granularityToMillis(); + FilterContext havingFilter = _queryContext.getHavingFilter(); + HavingFilterHandler havingFilterHandler = null; + if (havingFilter != null) { + havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler); + } + Record record = null; + for (long time = _startMs; time + 2 * step <= _endMs; time += step) { + Set<Key> keys = new HashSet<>(_groupByKeys); + if (record == null && sortedIterator.hasNext()) { + record = sortedIterator.next(); + } + + while (record != null) { + Object[] row = record.getValues(); + + Object[] resultRow = postAggregationHandler.getResult(row); + for (int i = 0; i < resultColumnDataTypes.length; i++) { + resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]); + } + + long timeCol = _dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRow[_timeBucketIndex])); + if (timeCol > time) { + break; + } + if (timeCol == time) { + if (havingFilterHandler == null || havingFilterHandler.isMatch(row)) { + gapfillResultRows.add(resultRow); + if (gapfillResultRows.size() == limit) { + return gapfillResultRows; + } + } + Key key = constructGroupKeys(resultRow); + keys.remove(key); + _previousByGroupKey.put(key, resultRow); + } + if (sortedIterator.hasNext()) { + record = sortedIterator.next(); + } else { + record = null; + } + } + + for (Key key : keys) { + Object[] gapfillRow = new Object[numResultColumns]; + int keyIndex = 0; + for (int i = 0; i < _isGroupBySelections.length; i++) { + if (_isGroupBySelections[i]) { + if (i == _timeBucketIndex) { + if (resultColumnDataTypes[i] == ColumnDataType.LONG) { + gapfillRow[_timeBucketIndex] = Long.valueOf(_dateTimeFormatter.fromMillisToFormat(time)); + } else { + gapfillRow[_timeBucketIndex] = _dateTimeFormatter.fromMillisToFormat(time); + } + } else { + gapfillRow[i] = key.getValues()[keyIndex++]; + } + } else { + gapfillRow[i] = getFillValue(i, key, resultColumnDataTypes[i]); + } + } + + if (havingFilterHandler == null || havingFilterHandler.isMatch(gapfillRow)) { + gapfillResultRows.add(gapfillRow); + if (gapfillResultRows.size() == limit) { + return gapfillResultRows; + } + } + } + } + return gapfillResultRows; + } + + Object getFillValue(int columIndex, Object key, ColumnDataType dataType) { + ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(columIndex); + if (expressionContext.getFunction() != null && GapfillUtils.isFill(expressionContext)) { + List<ExpressionContext> args = expressionContext.getFunction().getArguments(); + if (args.get(1).getLiteral() == null) { + throw new UnsupportedOperationException("Wrong Sql."); + } + GapfillUtils.FillType fillType = GapfillUtils.FillType.valueOf(args.get(1).getLiteral()); + if (fillType == GapfillUtils.FillType.FILL_DEFAULT_VALUE) { + // TODO: may fill the default value from sql in the future. + return GapfillUtils.getDefaultValue(dataType); + } else if (fillType == GapfillUtils.FillType.FILL_PREVIOUS_VALUE) { + Object[] row = _previousByGroupKey.get(key); + if (row != null) { + return row[columIndex]; + } else { + return GapfillUtils.getDefaultValue(dataType); + } + } else { + throw new UnsupportedOperationException("unsupported fill type."); + } + } else { + return GapfillUtils.getDefaultValue(dataType); + } + } + + /** + * Helper method to extract the final aggregation results for the given row (in-place). + */ + private void extractFinalAggregationResults(Object[] row) { + for (int i = 0; i < _numAggregationFunctions; i++) { + int valueIndex = i + _numGroupByExpressions; + row[valueIndex] = _aggregationFunctions[i].extractFinalResult(row[valueIndex]); + } + } + + /** + * Constructs the DataSchema for the rows before the post-aggregation (SQL mode). + */ + private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) { + String[] columnNames = dataSchema.getColumnNames(); + ColumnDataType[] columnDataTypes = new ColumnDataType[_numColumns]; + System.arraycopy(dataSchema.getColumnDataTypes(), 0, columnDataTypes, 0, _numGroupByExpressions); + for (int i = 0; i < _numAggregationFunctions; i++) { + columnDataTypes[i + _numGroupByExpressions] = _aggregationFunctions[i].getFinalResultColumnType(); + } + return new DataSchema(columnNames, columnDataTypes); + } + + private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTablesToReduce, + DataTableReducerContext reducerContext) + throws TimeoutException { + long start = System.currentTimeMillis(); + int numDataTables = dataTablesToReduce.size(); + + // Get the number of threads to use for reducing. + // In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls. + int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery()); + int limit = _queryContext.getLimit(); + // TODO: Make minTrimSize configurable + int trimSize = GroupByUtils.getTableCapacity(limit); + // NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy. + // TODO: Resolve the HAVING clause within the IndexedTable before returning the result + int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit; + int trimThreshold = reducerContext.getGroupByTrimThreshold(); + IndexedTable indexedTable; + if (numReduceThreadsToUse <= 1) { + indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold); + } else { + if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) { + // special case of trim threshold where it is set to max value. + // there won't be any trimming during upsert in this case. + // thus we can avoid the overhead of read-lock and write-lock + // in the upsert method. + indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, resultSize); + } else { + indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold); + } + } + + // Create groups of data tables that each thread can process concurrently. + // Given that numReduceThreads is <= numDataTables, each group will have at least one data table. + ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce); + List<List<DataTable>> reduceGroups = new ArrayList<>(numReduceThreadsToUse); + + for (int i = 0; i < numReduceThreadsToUse; i++) { + reduceGroups.add(new ArrayList<>()); + } + for (int i = 0; i < numDataTables; i++) { + reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i)); + } + + ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes(); + long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start); + try { + reducerContext.getExecutorService().invokeAll(reduceGroups.stream() + .map(reduceGroup -> new TraceCallable<Void>() { + @Override + public Void callJob() throws Exception { + for (DataTable dataTable : reduceGroup) { + int numRows = dataTable.getNumberOfRows(); + for (int rowId = 0; rowId < numRows; rowId++) { + Object[] values = new Object[_numColumns]; + for (int colId = 0; colId < _numColumns; colId++) { + switch (storedColumnDataTypes[colId]) { + case INT: + values[colId] = dataTable.getInt(rowId, colId); + break; + case LONG: + values[colId] = dataTable.getLong(rowId, colId); + break; + case FLOAT: + values[colId] = dataTable.getFloat(rowId, colId); + break; + case DOUBLE: + values[colId] = dataTable.getDouble(rowId, colId); + break; + case STRING: + values[colId] = dataTable.getString(rowId, colId); + break; + case BYTES: + values[colId] = dataTable.getBytes(rowId, colId); + break; + case OBJECT: + values[colId] = dataTable.getObject(rowId, colId); + break; + // Add other aggregation intermediate result / group-by column type supports here + default: + throw new IllegalStateException(); + } + } + indexedTable.upsert(new Record(values)); + } + } + return null; + } + }).collect(Collectors.toList()), timeOutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new TimeoutException("Timed out in broker reduce phase."); + } + + indexedTable.finish(true); + return indexedTable; + } + + /** + * Computes the number of reduce threads to use per query. + * <ul> + * <li> Use single thread if number of data tables to reduce is less than + * {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li> + * <li> Else, use min of max allowed reduce threads per query, and number of data tables.</li> + * </ul> + * + * @param numDataTables Number of data tables to reduce + * @param maxReduceThreadsPerQuery Max allowed reduce threads per query + * @return Number of reduce threads to use for the query + */ + private int getNumReduceThreadsToUse(int numDataTables, int maxReduceThreadsPerQuery) { + // Use single thread if number of data tables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE. + if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) { + return Math.min(1, numDataTables); // Number of data tables can be zero. + } + + return Math.min(maxReduceThreadsPerQuery, numDataTables); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java index e190e2f..4705951 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java @@ -28,6 +28,7 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.query.postaggregation.PostAggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.GapfillUtils; /** @@ -50,7 +51,7 @@ public class PostAggregationHandler { _numGroupByExpressions = groupByExpressions.size(); _groupByExpressionIndexMap = new HashMap<>(); for (int i = 0; i < _numGroupByExpressions; i++) { - _groupByExpressionIndexMap.put(groupByExpressions.get(i), i); + _groupByExpressionIndexMap.put(GapfillUtils.stripGapfill(groupByExpressions.get(i)), i); } } else { _numGroupByExpressions = 0; @@ -98,6 +99,7 @@ public class PostAggregationHandler { * Returns a ValueExtractor based on the given expression. */ public ValueExtractor getValueExtractor(ExpressionContext expression) { + expression = GapfillUtils.stripGapfill(expression); if (expression.getType() == ExpressionContext.Type.LITERAL) { // Literal return new LiteralValueExtractor(expression.getLiteral()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java index 7dcf05d..e5e9bf8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java @@ -22,6 +22,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.util.GapfillUtils; import org.apache.pinot.segment.spi.AggregationFunctionType; @@ -56,6 +57,8 @@ public final class ResultReducerFactory { } else { return new AggregationDataTableReducer(queryContext); } + } else if (GapfillUtils.isPostAggregateGapfill(queryContext)) { + return new GapFillGroupByDataTableReducer(queryContext); } else { // Aggregation group-by query return new GroupByDataTableReducer(queryContext); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java new file mode 100644 index 0000000..55b618c --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.util; + +import java.io.Serializable; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FunctionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.query.request.context.QueryContext; + + +/** + * Util class to encapsulate all utilites required for gapfill. + */ +public class GapfillUtils { + private static final String POST_AGGREGATE_GAP_FILL = "postaggregategapfill"; + private static final String FILL = "fill"; + + private GapfillUtils() { + } + + public static ExpressionContext stripGapfill(ExpressionContext expression) { + if (expression.getType() != ExpressionContext.Type.FUNCTION) { + return expression; + } + + FunctionContext function = expression.getFunction(); + String functionName = canonicalizeFunctionName(function.getFunctionName()); + if (functionName.equals(POST_AGGREGATE_GAP_FILL) || functionName.equals(FILL)) { + return function.getArguments().get(0); + } + return expression; + } + + public static boolean isPostAggregateGapfill(ExpressionContext expressionContext) { + if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) { + return false; + } + + return POST_AGGREGATE_GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName())); + } + + public static boolean isPostAggregateGapfill(QueryContext queryContext) { + for (ExpressionContext expressionContext : queryContext.getSelectExpressions()) { + if (isPostAggregateGapfill(expressionContext)) { + return true; + } + } + return false; + } + + public static boolean isFill(ExpressionContext expressionContext) { + if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) { + return false; + } + + return FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName())); + } + + static public enum FillType { + FILL_DEFAULT_VALUE, + FILL_PREVIOUS_VALUE, + } + + /** + * The default gapfill value for each column type. + */ + static public Serializable getDefaultValue(DataSchema.ColumnDataType dataType) { + switch (dataType) { + // Single-value column + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case TIMESTAMP: + return dataType.convertAndFormat(0); + case STRING: + case JSON: + case BYTES: + return ""; + case INT_ARRAY: + return new int[0]; + case LONG_ARRAY: + return new long[0]; + case FLOAT_ARRAY: + return new float[0]; + case DOUBLE_ARRAY: + return new double[0]; + case STRING_ARRAY: + case TIMESTAMP_ARRAY: + return new String[0]; + case BOOLEAN_ARRAY: + return new boolean[0]; + case BYTES_ARRAY: + return new byte[0][0]; + default: + throw new IllegalStateException(String.format("Cannot provide the default value for the type: %s", dataType)); + } + } + + private static String canonicalizeFunctionName(String functionName) { + return StringUtils.remove(functionName, '_').toLowerCase(); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java new file mode 100644 index 0000000..bfd7baa --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java @@ -0,0 +1,616 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.queries; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ImmutableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.DateTimeGranularitySpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Queries test for PostAggregationGapfill queries. + */ +@SuppressWarnings("rawtypes") +public class PostAggregationGapfillQueriesTest extends BaseQueriesTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PostAggregationGapfillQueriesTest"); + private static final String RAW_TABLE_NAME = "parkingData"; + private static final String SEGMENT_NAME = "testSegment"; + private static final Random RANDOM = new Random(); + + private static final int NUM_LOTS = 4; + + private static final String IS_OCCUPIED_COLUMN = "isOccupied"; + private static final String LOT_ID_COLUMN = "lotId"; + private static final String EVENT_TIME_COLUMN = "eventTime"; + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .addSingleValueDimension(IS_OCCUPIED_COLUMN, DataType.BOOLEAN) + .addSingleValueDimension(LOT_ID_COLUMN, DataType.STRING) + .addSingleValueDimension(EVENT_TIME_COLUMN, DataType.LONG) + .setPrimaryKeyColumns(Arrays.asList(LOT_ID_COLUMN, EVENT_TIME_COLUMN)) + .build(); + private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .build(); + + private IndexSegment _indexSegment; + private List<IndexSegment> _indexSegments; + + @Override + protected String getFilter() { + // NOTE: Use a match all filter to switch between DictionaryBasedAggregationOperator and AggregationOperator + return " WHERE eventTime >= 0"; + } + + @Override + protected IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + protected List<IndexSegment> getIndexSegments() { + return _indexSegments; + } + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(INDEX_DIR); + + long current = 1636286400000L; //November 7, 2021 12:00:00 PM + int duplicates = 16; + int interval = 1000 * 900; // 15 minutes + long start = current - duplicates * 2 * interval; //November 7, 2021 4:00:00 AM + + List<GenericRow> records = new ArrayList<>(NUM_LOTS * 2); + for (int i = 0; i < NUM_LOTS; i++) { + for (int j = 0; j < duplicates; j++) { + if (j == 4 || j == 5 || j == 6 || j == 7 || j == 10 || j == 11) { + continue; + } + long parkingTime = start + interval * 2 * j + RANDOM.nextInt(interval); + long departingTime = j == 3 ? start + interval * (2 * j + 6) + RANDOM.nextInt(interval) : start + + interval * (2 * j + 1) + RANDOM.nextInt(interval); + + GenericRow parkingRow = new GenericRow(); + parkingRow.putValue(EVENT_TIME_COLUMN, parkingTime); + parkingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i)); + parkingRow.putValue(IS_OCCUPIED_COLUMN, true); + records.add(parkingRow); + + GenericRow departingRow = new GenericRow(); + departingRow.putValue(EVENT_TIME_COLUMN, departingTime); + departingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i)); + departingRow.putValue(IS_OCCUPIED_COLUMN, false); + records.add(departingRow); + } + } + + SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); + segmentGeneratorConfig.setSegmentName(SEGMENT_NAME); + segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath()); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records)); + driver.build(); + + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment); + } + + @Test + public void datetimeconvertGapfillTest() { + String dataTimeConvertQuery = "SELECT " + + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, " + + "lotId, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); + + ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); + Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); + + String gapfillQuery = "SELECT " + + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " + + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + DateTimeFormatSpec dateTimeFormatter + = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS"); + DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); + + BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); + + ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); + Assert.assertEquals(gapFillResultTable.getRows().size(), 32); + List<Object[]> gapFillRows = gapFillResultTable.getRows(); + long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000"); + for (int i = 0; i < 32; i += 4) { + String firstTimeCol = (String) gapFillRows.get(i)[0]; + long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol); + Assert.assertEquals(timeStamp, start); + Set<String> lots = new HashSet<>(); + lots.add((String) gapFillRows.get(i)[1]); + for (int j = 1; j < 4; j++) { + Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); + Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); + lots.add((String) gapFillRows.get(i + j)[1]); + } + start += dateTimeGranularity.granularityToMillis(); + } + } + + @Test + public void toEpochHoursGapfillTest() { + String dataTimeConvertQuery = "SELECT " + + "ToEpochHours(eventTime) AS time_col, " + + "lotId, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); + + ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); + Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); + + String gapfillQuery = "SELECT " + + "PostAggregateGapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', " + + "'454515', '454524', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH"); + DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); + + BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); + + ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); + Assert.assertEquals(gapFillResultTable.getRows().size(), 32); + List<Object[]> gapFillRows = gapFillResultTable.getRows(); + long start = dateTimeFormatter.fromFormatToMillis("454515"); + for (int i = 0; i < 32; i += 4) { + Long firstTimeCol = (Long) gapFillRows.get(i)[0]; + long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString()); + Assert.assertEquals(timeStamp, start); + Set<String> lots = new HashSet<>(); + lots.add((String) gapFillRows.get(i)[1]); + for (int j = 1; j < 4; j++) { + Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); + Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); + lots.add((String) gapFillRows.get(i + j)[1]); + } + start += dateTimeGranularity.granularityToMillis(); + } + } + + @Test + public void toEpochMinutesRoundedHoursGapfillTest() { + String dataTimeConvertQuery = "SELECT " + + "ToEpochMinutesRounded(eventTime, 60) AS time_col, " + + "lotId, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); + + ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); + Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); + + String gapfillQuery = "SELECT " + + "PostAggregateGapFill(ToEpochMinutesRounded(eventTime, 60), '1:HOURS:EPOCH', " + + "'454515', '454524', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH"); + DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); + + BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); + + ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); + Assert.assertEquals(gapFillResultTable.getRows().size(), 32); + List<Object[]> gapFillRows = gapFillResultTable.getRows(); + long start = dateTimeFormatter.fromFormatToMillis("454515"); + for (int i = 0; i < 32; i += 4) { + Long firstTimeCol = (Long) gapFillRows.get(i)[0]; + long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString()); + Assert.assertEquals(timeStamp, start); + Set<String> lots = new HashSet<>(); + lots.add((String) gapFillRows.get(i)[1]); + for (int j = 1; j < 4; j++) { + Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); + Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); + lots.add((String) gapFillRows.get(i + j)[1]); + } + start += dateTimeGranularity.granularityToMillis(); + } + } + + @Test + public void toEpochMinutesBucketHoursGapfillTest() { + String dataTimeConvertQuery = "SELECT " + + "ToEpochMinutesBucket(eventTime, 60) AS time_col, " + + "lotId, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); + + ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); + Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); + + String gapfillQuery = "SELECT " + + "PostAggregateGapFill(ToEpochMinutesBucket(eventTime, 60), '1:HOURS:EPOCH', " + + "'454515', '454524', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH"); + DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); + + BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); + + ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); + Assert.assertEquals(gapFillResultTable.getRows().size(), 32); + List<Object[]> gapFillRows = gapFillResultTable.getRows(); + long start = dateTimeFormatter.fromFormatToMillis("454515"); + for (int i = 0; i < 32; i += 4) { + Long firstTimeCol = (Long) gapFillRows.get(i)[0]; + long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString()); + Assert.assertEquals(timeStamp, start); + Set<String> lots = new HashSet<>(); + lots.add((String) gapFillRows.get(i)[1]); + for (int j = 1; j < 4; j++) { + Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); + Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); + lots.add((String) gapFillRows.get(i + j)[1]); + } + start += dateTimeGranularity.granularityToMillis(); + } + } + + @Test + public void dateTruncHoursGapfillTest() { + String dataTimeConvertQuery = "SELECT " + + "DATETRUNC('hour', eventTime, 'milliseconds') AS time_col, " + + "lotId, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')" + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery); + + ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable(); + Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24); + + String gapfillQuery = "SELECT " + + "PostAggregateGapFill(DATETRUNC('hour', eventTime, 'milliseconds'), '1:HOURS:EPOCH', " + + "'454515', '454524', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1 " + + "LIMIT 200"; + + DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH"); + DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); + + BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); + + ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); + Assert.assertEquals(gapFillResultTable.getRows().size(), 32); + List<Object[]> gapFillRows = gapFillResultTable.getRows(); + long start = dateTimeFormatter.fromFormatToMillis("454515"); + for (int i = 0; i < 32; i += 4) { + Long firstTimeCol = (Long) gapFillRows.get(i)[0]; + long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString()); + Assert.assertEquals(timeStamp, start); + Set<String> lots = new HashSet<>(); + lots.add((String) gapFillRows.get(i)[1]); + for (int j = 1; j < 4; j++) { + Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); + Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); + lots.add((String) gapFillRows.get(i + j)[1]); + } + start += dateTimeGranularity.granularityToMillis(); + } + } + + @Test + public void datetimeconvertGapfillTestWithoutTimeBucketOrdering() { + try { + String gapfillQuery = "SELECT " + + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " + + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "LIMIT 200"; + + getBrokerResponseForSqlQuery(gapfillQuery); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertEquals(e.getMessage(), "PostAggregateGapFill does not work if the time bucket is not ordered."); + } + } + + @Test + public void datetimeconvertGapfillTestWithHavingClause() { + String dataTimeConvertQueryWithUnoccupied = "SELECT " + + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, " + + "lotId, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "HAVING status = 'false' " + + "ORDER BY 1 " + + "LIMIT 200"; + + BrokerResponseNative dateTimeConvertBrokerResponseWithUnoccupied + = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithUnoccupied); + + ResultTable dateTimeConvertResultTableWithUnoccupied = dateTimeConvertBrokerResponseWithUnoccupied.getResultTable(); + Assert.assertEquals(dateTimeConvertResultTableWithUnoccupied.getRows().size(), 20); + + String dataTimeConvertQueryWithOccupied = "SELECT " + + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, " + + "lotId, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "HAVING status = 'true' " + + "ORDER BY 1 " + + "LIMIT 200"; + + BrokerResponseNative dateTimeConvertBrokerResponseWithOccupied + = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithOccupied); + + ResultTable dateTimeConvertResultTableWithOccupied = dateTimeConvertBrokerResponseWithOccupied.getResultTable(); + Assert.assertEquals(dateTimeConvertResultTableWithOccupied.getRows().size(), 4); + + String gapfillQueryWithOccupied = "SELECT " + + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " + + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "HAVING status = 'true' " + + "ORDER BY 1 " + + "LIMIT 7"; + + BrokerResponseNative gapfillBrokerResponseWithOccupied = getBrokerResponseForSqlQuery(gapfillQueryWithOccupied); + + ResultTable gapFillResultTableWithOccupied = gapfillBrokerResponseWithOccupied.getResultTable(); + Assert.assertEquals(gapFillResultTableWithOccupied.getRows().size(), 7); + + for (Object [] row : gapFillResultTableWithOccupied.getRows()) { + Assert.assertEquals(row[2], true); + } + + String gapfillQueryWithUnoccupied = "SELECT " + + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " + + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "HAVING status = 'false' " + + "ORDER BY 1 " + + "LIMIT 24"; + + BrokerResponseNative gapfillBrokerResponseWithUnoccupied = getBrokerResponseForSqlQuery(gapfillQueryWithUnoccupied); + + ResultTable gapFillResultTableWithUnoccupied = gapfillBrokerResponseWithUnoccupied.getResultTable(); + Assert.assertEquals(gapFillResultTableWithUnoccupied.getRows().size(), 24); + for (Object [] row : gapFillResultTableWithUnoccupied.getRows()) { + Assert.assertEquals(row[2], false); + } + } + + + @Test + public void datetimeconvertGapfillTestTimeBucketAsLastSelection() { + String gapfillQuery = "SELECT " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3, " + + "lotId, PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " + + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 4, 5 " + + "ORDER BY 5 " + + "LIMIT 200"; + + DateTimeFormatSpec dateTimeFormatter + = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS"); + DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); + + BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); + + ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); + Assert.assertEquals(gapFillResultTable.getRows().size(), 32); + List<Object[]> gapFillRows = gapFillResultTable.getRows(); + long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000"); + for (int i = 0; i < 32; i += 4) { + String timeCol = (String) gapFillRows.get(i)[4]; + long timeStamp = dateTimeFormatter.fromFormatToMillis(timeCol); + Assert.assertEquals(timeStamp, start); + Set<String> lots = new HashSet<>(); + lots.add((String) gapFillRows.get(i)[3]); + for (int j = 1; j < 4; j++) { + Assert.assertEquals(gapFillRows.get(i)[4], gapFillRows.get(i + j)[4]); + Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[3])); + lots.add((String) gapFillRows.get(i + j)[3]); + } + start += dateTimeGranularity.granularityToMillis(); + } + } + + @Test + public void datetimeconvertGapfillWithOrderingByTwoColumnsTest() { + String gapfillQuery = "SELECT " + + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), " + + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', " + + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, " + + "lotId, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, " + + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, " + + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 " + + "FROM parkingData " + + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 " + + "GROUP BY 1, 2 " + + "ORDER BY 1, 2 " + + "LIMIT 200"; + + DateTimeFormatSpec dateTimeFormatter + = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS"); + DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS"); + + BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery); + + ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable(); + Assert.assertEquals(gapFillResultTable.getRows().size(), 32); + List<Object[]> gapFillRows = gapFillResultTable.getRows(); + long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000"); + for (int i = 0; i < 32; i += 4) { + String firstTimeCol = (String) gapFillRows.get(i)[0]; + long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol); + Assert.assertEquals(timeStamp, start); + Set<String> lots = new HashSet<>(); + lots.add((String) gapFillRows.get(i)[1]); + for (int j = 1; j < 4; j++) { + Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]); + Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1])); + lots.add((String) gapFillRows.get(i + j)[1]); + } + start += dateTimeGranularity.granularityToMillis(); + } + } + + @AfterClass + public void tearDown() + throws IOException { + _indexSegment.destroy(); + FileUtils.deleteDirectory(INDEX_DIR); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org